import { MysqlError } from "mysqljs";
import { Maybe } from "util/maybe";
import {
    ClusterStatisticsAction,
    ClusterStatisticsPayload,
    MemsqlVersionAction,
    MemsqlPartitionsAction,
    MemsqlPartitionsPayload,
} from "data/actions";
import {
    SQLClusterStatusRow,
    SQLDistributedPartitionsRow,
    SQLDistributedDatabasesRow,
    SQLRowsThroughputRow,
    SQLClusterTroughputEnabledRow,
} from "worker/api/cluster-metadata-queries";

import {
    Node,
    Partition,
    PartitionInstance,
    RowsThroughputPayload,
} from "data/models";

import _ from "lodash";
import BigNumber from "vendor/bignumber.js/bignumber";

import { fetchMemsqlVersion } from "worker/api/version-query";
import {
    buildClusterStatisticsAction,
    RowsThroughputAction,
    RowsThroughputEnabledAction,
} from "data/actions/cluster-metadata";

import { HandlerContext } from "worker/api";
import { Connection } from "mysqljs";

import { Observable } from "rxjs";
import { makeActionCreator } from "worker/api/helpers";
import { fetchTopologyData, buildNodes } from "worker/api/topology";

import { differenceInSeconds } from "date-fns";
import { select, multiSelect } from "util/query";
import { nullableToMaybe } from "util/nullable";
import { Version } from "util/version";
import { logError } from "util/logging";

import {
    SQL_DISTRIBUTED_PARTITIONS,
    SQL_MV_CLUSTER_STATUS,
    SQL_DISTRIBUTED_DATABASES,
    SQL_CLUSTER_THROUGHPUT,
    SQL_CLUSTER_THROUGHPUT_ENABLED,
} from "worker/api/cluster-metadata-queries";

const SQL_STATISTICS_HA = `
    SHOW VARIABLES LIKE 'redundancy_level'
`;

type SQLStatisticsHA = {
    Value: string;
};

function fetchClusterStatistics(
    conn: Connection
): Promise<ClusterStatisticsPayload> {
    return select(conn, SQL_STATISTICS_HA).then(
        (sqlClusterHA: Array<SQLStatisticsHA>) => {
            if (sqlClusterHA.length !== 1) {
                throw new Error(
                    "The SQL Statistics HA query should only return one row"
                );
            }

            return {
                haEnabled: Number(sqlClusterHA[0].Value) >= 2, // boolean
            };
        }
    );
}

export const queryClusterStatistics = makeActionCreator({
    name: "queryClusterStatistics",

    handle: (ctx: HandlerContext): Observable<ClusterStatisticsAction> => {
        const $loading = Observable.of<ClusterStatisticsAction>({
            type: "CLUSTER_STATISTICS",
            error: false,
            payload: { loading: true },
        });

        const $compute = Observable.fromPromise(
            ctx.manager
                .getPooledConnection()
                .then(conn =>
                    fetchClusterStatistics(conn)
                        .then(buildClusterStatisticsAction)
                        .finally(() => conn.release())
                )
                .catch(
                    (err: Error | MysqlError): ClusterStatisticsAction => {
                        logError(err);

                        return {
                            type: "CLUSTER_STATISTICS",
                            error: true,
                            payload: err.message,
                        };
                    }
                )
        );

        return Observable.merge($loading, $compute);
    },
});

const createMemsqlVersionAction = (data: Version): MemsqlVersionAction => {
    return {
        type: "MEMSQL_VERSION",
        error: false,
        payload: {
            loading: false,
            data,
        },
    };
};

// This action is *only* being dispatched on app load by `ClusterLayout`
// component.
export const queryMemsqlVersion = makeActionCreator({
    name: "queryMemsqlVersion",

    handle: (ctx: HandlerContext): Observable<MemsqlVersionAction> => {
        const $loading = Observable.of<MemsqlVersionAction>({
            type: "MEMSQL_VERSION",
            error: false,
            payload: { loading: true },
        });

        const $compute = Observable.fromPromise(
            ctx.manager
                .getPooledConnection()
                .then(conn =>
                    fetchMemsqlVersion(conn)
                        .then(createMemsqlVersionAction)
                        .finally(() => conn.release())
                )
                .catch(
                    (err: Error | MysqlError): MemsqlVersionAction => {
                        logError(err);

                        return {
                            type: "MEMSQL_VERSION",
                            error: true,
                            payload: {
                                message: err.message,
                            },
                        };
                    }
                )
        );

        return Observable.merge($loading, $compute);
    },
});

// For MemSQL versions >= 7, all reference partitions are
// asynchronously replicated. For MemSQL versions
// >= 6.5 AND < 7, all reference partitions are synchronously
// replicated.
function areRefPartitionsSyncReplicated(version: Version) {
    return version.lt(new Version([7, 0, 0]));
}

function fetchPartitionData(conn: Connection) {
    return multiSelect<
        SQLClusterStatusRow,
        SQLDistributedPartitionsRow,
        SQLDistributedDatabasesRow
    >(
        conn,
        { sql: SQL_MV_CLUSTER_STATUS },
        { sql: SQL_DISTRIBUTED_PARTITIONS },
        { sql: SQL_DISTRIBUTED_DATABASES }
    );
}

export function getMemsqlPartitionsData({
    sqlClusterStatus,
    sqlDistributedPartitions,
    sqlDistributedDatabases,
    nodes,
    version,
}: {
    sqlClusterStatus: Array<SQLClusterStatusRow>;
    sqlDistributedPartitions: Array<SQLDistributedPartitionsRow>;
    sqlDistributedDatabases: Array<SQLDistributedDatabasesRow>;
    nodes: Array<Node>;
    version: Version;
}): Omit<MemsqlPartitionsPayload, "deltaTimeS"> {
    // Create partition objects.
    const partitions: Array<Partition> = [];
    _.forEach(
        sqlDistributedDatabases,
        ({ databaseName, numPartitions, drReplica, syncReplicated }) => {
            partitions.push({
                kind: "reference",
                databaseName,
                drReplica: drReplica.eq(1),
                syncReplicated: areRefPartitionsSyncReplicated(version),
            });

            for (
                let ordinal = new BigNumber(0);
                ordinal.lt(numPartitions);
                ordinal = ordinal.plus(1)
            ) {
                partitions.push({
                    kind: "data",
                    databaseName,
                    ordinal,
                    drReplica: drReplica.eq(1),
                    syncReplicated: syncReplicated.eq(1),
                });
            }
        }
    );

    // This object maps partition hashes (`db_ordinal.host.port`) to the
    // partition status row in MV_CLUSTER_STATUS.
    const clusterStatus: {
        [key: string]: SQLClusterStatusRow;
    } = {};

    _.forEach(sqlClusterStatus, partition => {
        const partitionHash = [
            partition.databaseName,
            partition.host,
            partition.port,
        ].join(".");

        clusterStatus[partitionHash] = partition;
    });

    const partitionInstances: Array<PartitionInstance> = [];

    function getPartitionState(
        partitionStatus: Maybe<SQLClusterStatusRow>
    ): PartitionInstance["state"] {
        if (partitionStatus) {
            return partitionStatus.state || "Unknown";
        } else {
            return "offline";
        }
    }

    // Create data partition instance objects.
    _.forEach(
        sqlDistributedPartitions,
        ({ databaseName, ordinal, host, port, role, isOffline, syncState }) => {
            if (host === null) {
                throw new Error(
                    "Host should not be null in information_schema.distributed_databases"
                );
            }

            if (port === null) {
                throw new Error(
                    "Port should not be null in information_schema.distributed_databases"
                );
            }

            const partitionHash = [
                `${databaseName}_${ordinal}`,
                host,
                port,
            ].join(".");

            const database = _.find(sqlDistributedDatabases, { databaseName });

            let drReplica;
            if (database) {
                drReplica = database.drReplica.eq(1);
            }

            // Try to find this partition instance in MV_CLUSTER_STATUS.
            const partitionStatus = clusterStatus[partitionHash];

            let parsedRole: PartitionInstance["role"];
            if (isOffline && isOffline.eq(1)) {
                switch (role) {
                    case "Master":
                        parsedRole = "detached master";
                        break;

                    case "Slave":
                        parsedRole = "detached slave";
                        break;

                    default:
                        parsedRole = nullableToMaybe(role);
                }
            } else {
                switch (role) {
                    case "Master":
                        parsedRole = "master";
                        break;

                    case "Slave":
                        parsedRole = "slave";
                        break;

                    default:
                        parsedRole = nullableToMaybe(role);
                }
            }

            partitionInstances.push({
                ordinal,
                databaseName,
                host,
                port,
                position: partitionStatus
                    ? nullableToMaybe(partitionStatus.position)
                    : undefined,
                role: parsedRole,
                state: getPartitionState(partitionStatus),
                details: (partitionStatus && partitionStatus.details) || "",
                syncState: syncState || undefined,
                drReplica,
            });
        }
    );

    // Create reference partition instance objects.
    _.forEach(sqlDistributedDatabases, ({ databaseName, drReplica }) => {
        _.forEach(nodes, node => {
            const partitionHash = [databaseName, node.host, node.port].join(
                "."
            );

            const partitionStatus = clusterStatus[partitionHash];

            function getPartitionRole(): PartitionInstance["role"] {
                if (node.role === "MASTER_AGGREGATOR") {
                    return "master";
                } else {
                    if (node.state === "online") {
                        return "slave";
                    } else {
                        return "detached slave";
                    }
                }
            }

            partitionInstances.push({
                databaseName,
                host: node.host,
                port: node.port,
                ordinal: undefined,
                role: getPartitionRole(),
                position: partitionStatus
                    ? nullableToMaybe(partitionStatus.position)
                    : undefined,
                state: getPartitionState(partitionStatus),
                details: (partitionStatus && partitionStatus.details) || "",

                syncState: areRefPartitionsSyncReplicated(version)
                    ? "sync"
                    : "async",

                drReplica: drReplica.eq(1),
            });
        });
    });

    return {
        partitions,
        partitionInstances,
    };
}

export const queryPartitionData = makeActionCreator({
    name: "queryPartitionData",

    handle: (ctx: HandlerContext): Observable<MemsqlPartitionsAction> => {
        const $loading = Observable.of<MemsqlPartitionsAction>({
            type: "MEMSQL_PARTITIONS",
            error: false,
            payload: { loading: true },
        });

        const startDate = new Date();

        const $compute = Observable.fromPromise(
            ctx.manager.getPooledConnection().then(conn =>
                Promise.all([conn, fetchMemsqlVersion(conn)])
                    .then(([conn, version]) =>
                        Promise.all([conn, version, fetchTopologyData(conn)])
                    )
                    .then(([conn, version, [rawLeaves, rawAggregators]]) =>
                        Promise.all([
                            conn,
                            version,
                            buildNodes([rawLeaves, rawAggregators]),
                        ])
                    )
                    .then(([conn, version, nodes]) =>
                        Promise.all([version, fetchPartitionData(conn), nodes])
                    )
                    .then(
                        ([
                            version,
                            [
                                sqlClusterStatus,
                                sqlDistributedPartitions,
                                sqlDistributedDatabases,
                            ],
                            nodes,
                        ]) =>
                            getMemsqlPartitionsData({
                                sqlClusterStatus,
                                sqlDistributedPartitions,
                                sqlDistributedDatabases,
                                nodes,
                                version,
                            })
                    )
                    .then(
                        (partitionsPayload): MemsqlPartitionsAction => ({
                            type: "MEMSQL_PARTITIONS",
                            error: false,
                            payload: {
                                loading: false,
                                data: {
                                    ...partitionsPayload,
                                    deltaTimeS: differenceInSeconds(
                                        new Date(),
                                        startDate
                                    ),
                                },
                            },
                        })
                    )
                    .finally(() => conn.release())
                    .catch(
                        (err: Error | MysqlError): MemsqlPartitionsAction => {
                            logError(err);

                            return {
                                type: "MEMSQL_PARTITIONS",
                                error: true,
                                payload: err.message,
                            };
                        }
                    )
            )
        );

        return Observable.merge($loading, $compute);
    },
});

export function mapSQLRowsThroughput(
    sqlRowsThroughputProps: Array<SQLRowsThroughputRow>
): Maybe<RowsThroughputPayload> {
    if (sqlRowsThroughputProps && sqlRowsThroughputProps.length > 0) {
        let properties = {
            rowsAffectedByWrites: new BigNumber(0),
            rowsReturnedByReads: new BigNumber(0),
            // We can assume the readTime from the first result, since they are all the same
            readTime: sqlRowsThroughputProps[0].readTime,
        };

        sqlRowsThroughputProps.forEach(sqlProp => {
            if (sqlProp.variableName === "Rows_returned_by_reads") {
                properties.rowsReturnedByReads = sqlProp.variableValue;
            } else if (sqlProp.variableName === "Rows_affected_by_writes") {
                properties.rowsAffectedByWrites = sqlProp.variableValue;
            }
        });

        return properties;
    }
}

function fetchRowsThroughput(conn: Connection) {
    return select(conn, SQL_CLUSTER_THROUGHPUT).then(mapSQLRowsThroughput);
}

const createRowsThroughputAction = (
    data: RowsThroughputPayload
): RowsThroughputAction => {
    return {
        type: "ROWS_THROUGHPUT",
        error: false,
        payload: {
            loading: false,
            data,
        },
    };
};

export const queryRowsThroughput = makeActionCreator({
    name: "queryRowsThroughput",

    handle: (ctx: HandlerContext): Observable<RowsThroughputAction> => {
        return Observable.fromPromise(
            ctx.manager
                .getPooledConnection()
                .then(conn =>
                    fetchRowsThroughput(conn)
                        .then(data => {
                            if (data) {
                                return createRowsThroughputAction(data);
                            } else {
                                const errorMessage =
                                    "Unable to parse rows throughput metrics in MV_GLOBAL_STATUS";
                                const errorAction: RowsThroughputAction = {
                                    type: "ROWS_THROUGHPUT",
                                    error: true,
                                    payload: errorMessage,
                                };

                                logError(new Error(errorMessage));
                                return errorAction;
                            }
                        })
                        .finally(() => conn.release())
                )
                .catch(
                    (err: Error | MysqlError): RowsThroughputAction => {
                        logError(err);

                        return {
                            type: "ROWS_THROUGHPUT",
                            error: true,
                            payload: err.message,
                        };
                    }
                )
        );
    },
});

//  Only engines after 6.5 support this new feature. After 6.5 being deprecated
//  on studio, we can safely remove this code and all the checks related to the
//  isRowsTroughputEnabled
export const isRowsThroughputEnabled = makeActionCreator({
    name: "isRowsThroughputEnabled",

    handle(ctx: HandlerContext): Promise<RowsThroughputEnabledAction> {
        return ctx.manager.getPooledConnection().then(conn =>
            select(conn, SQL_CLUSTER_THROUGHPUT_ENABLED)
                .then(
                    (
                        rows: Array<SQLClusterTroughputEnabledRow>
                    ): RowsThroughputEnabledAction => {
                        if (rows.length !== 1) {
                            throw new Error(
                                "The Rows Throughput Enabled query should only return one row"
                            );
                        }

                        return {
                            type: "ROWS_THROUGHPUT_ENABLED",
                            error: false,
                            payload: rows[0].isRowsTroughputEnabled.eq(1),
                        };
                    }
                )
                .catch(
                    (err: MysqlError | Error): RowsThroughputEnabledAction => {
                        logError(err);

                        return {
                            type: "ROWS_THROUGHPUT_ENABLED",
                            error: false,
                            payload: false,
                        };
                    }
                )
                .finally(() => conn.release())
        );
    },
});
