import { MysqlError } from "mysqljs";
import { HandlerContext } from "worker/api";
import {
    TopologyAction,
    NodesLiveMonitoringAction,
    NodesLiveMonitoringStopAction,
} from "data/actions";
import { Leaf, Aggregator, Node, NodeLiveMonitoringMetrics } from "data/models";
import { Connection } from "mysqljs";
import {
    SQLLeavesRow,
    SQLAggregatorsRow,
    SQLNodesLiveCPURow,
    SQLNodesLiveMemoryRow,
    SQLNodesLiveDiskRow,
} from "worker/api/topology-queries";
import { Maybe } from "util/maybe";
import {
    LoadingError,
    LEError,
    LELoading,
    LESuccess,
} from "util/loading-error";

import _ from "lodash";
import { Observable } from "rxjs";
import BigNumber from "vendor/bignumber.js/bignumber";

import {
    SQL_LEAVES_QUERY,
    SQL_AGGREGATORS_QUERY,
    SQL_NODES_LIVE_CPU,
    SQL_NODES_LIVE_MEMORY,
    SQL_NODES_LIVE_DISK,
} from "worker/api/topology-queries";
import { getEmptyNodeLiveMonitoringMetrics } from "data/models";

import { differenceInSeconds } from "date-fns";
import { makeActionCreator } from "worker/api/helpers";
import { multiSelect } from "util/query";
import { logError } from "util/logging";
import {
    buildTopologyAction,
    NodesLiveMonitoringActionPayload,
} from "data/actions/topology";
import { nullableToMaybe } from "util/nullable";
import * as logging from "util/logging";

export function buildNodes([rawLeaves, rawAggregators]: [
    Array<SQLLeavesRow>,
    Array<SQLAggregatorsRow>
]) {
    const leaves = _.map(
        rawLeaves,
        (rawLeaf: SQLLeavesRow): Leaf => ({
            host: rawLeaf.host,
            port: rawLeaf.port,
            state: rawLeaf.state,
            openedConnections: rawLeaf.openedConnections,
            averageLatency: rawLeaf.averageLatency || undefined,
            availabilityGroup: rawLeaf.availabilityGroup,
            role: "LEAF",
            pairHost: rawLeaf.pairHost,
            pairPort: nullableToMaybe(rawLeaf.pairPort),

            liveMonitoring: getEmptyNodeLiveMonitoringMetrics(),
        })
    );

    const aggregators = _.map(
        rawAggregators,
        (rawAggregator: SQLAggregatorsRow): Aggregator => ({
            host: rawAggregator.host,
            port: rawAggregator.port,
            state: rawAggregator.state,
            openedConnections: rawAggregator.openedConnections,
            averageLatency: rawAggregator.averageLatency || undefined,
            role: rawAggregator.masterAggregator.eq(1)
                ? "MASTER_AGGREGATOR"
                : "AGGREGATOR",

            liveMonitoring: getEmptyNodeLiveMonitoringMetrics(),
        })
    );

    return _.concat<Node>(leaves, aggregators);
}

export const fetchTopologyData = (conn: Connection) => {
    return multiSelect<SQLLeavesRow, SQLAggregatorsRow>(
        conn,
        { sql: SQL_LEAVES_QUERY },
        { sql: SQL_AGGREGATORS_QUERY }
    );
};

export const queryTopology = makeActionCreator({
    name: "queryTopology",

    handle: (ctx: HandlerContext): Observable<TopologyAction> => {
        const $loading = Observable.of<TopologyAction>({
            type: "TOPOLOGY",
            error: false,
            payload: { loading: true },
        });

        const startDate = new Date();

        const $compute = Observable.fromPromise<TopologyAction>(
            ctx.manager
                .getPooledConnection()
                .then(conn =>
                    fetchTopologyData(conn)
                        .then(buildNodes)
                        .then(nodes =>
                            buildTopologyAction({
                                nodes,
                                deltaTimeS: differenceInSeconds(
                                    new Date(),
                                    startDate
                                ),
                            })
                        )
                        .finally(() => conn.release())
                )
                .catch(
                    (err: Error | MysqlError): TopologyAction => {
                        logError(err);

                        return {
                            type: "TOPOLOGY",
                            error: true,
                            payload: err.message,
                        };
                    }
                )
        );

        return Observable.merge($loading, $compute);
    },
});

type LiveMonitoringQueryResults = {
    sqlNodesLiveCPU: Array<SQLNodesLiveCPURow>;
    sqlNodesLiveMemory: Array<SQLNodesLiveMemoryRow>;
    sqlNodesLiveDisk: Array<SQLNodesLiveDiskRow>;
};

// This function calculates the CPU usage for a given node given the results for
// the previous and the current mv_sysinfo_cpu query.
// ** This function has a few EARLY RETURNS and throws some errors to handle
// some special cases. **
function calculateNodeCpuUsagePercent(
    previous: Maybe<SQLNodesLiveCPURow>,
    current: SQLNodesLiveCPURow
): LoadingError<number> {
    let nodeCpuUsage;
    let nodeCpuLimit;

    if (current.memsqlTotalUsedCumulativeNs === null) {
        throw new Error("current.totalUsedCumulativeNs should not be null");
    }

    if (current.timestampNs === null) {
        throw new Error("current.timestampNs should not be null");
    }

    if (previous) {
        if (previous.memsqlTotalUsedCumulativeNs === null) {
            throw new Error(
                "previous.totalUsedCumulativeNs should not be null"
            );
        }

        if (previous.timestampNs === null) {
            throw new Error("previous.timestampNs should not be null");
        }

        if (current.numCpus === null) {
            throw new Error("current.numCpus should not be null");
        }

        // nodeCpuUsage is defined in number of cpu seconds per second
        nodeCpuUsage = current.memsqlTotalUsedCumulativeNs
            .minus(previous.memsqlTotalUsedCumulativeNs)
            .dividedBy(current.timestampNs.minus(previous.timestampNs));

        nodeCpuLimit = current.numCpus;

        // This node's CPU usage is being managed via a cgroup
        if (current.cgroupQuotaNs) {
            if (current.cgroupPeriodNs === null) {
                throw new Error(
                    "sqlNodesLiveCPU.cgroupPeriodNs should not be null"
                );
            }

            // If there is a previous value for cgroupQuotaNs and it is
            // different from the current one, we make `totalCpuUsagePercent`
            // undefined since a valid comparison cannot be made. An example
            // problematic case would be if the quota increases, then the delta
            // cpu time could be higher than the previous quota would have
            // allowed and we'd get a value over 100%.
            if (
                previous.cgroupQuotaNs !== null &&
                !current.cgroupQuotaNs.eq(previous.cgroupQuotaNs)
            ) {
                return new LELoading();
            } else {
                // If the quota forms a tighter bound on CPU than the number of
                // physical CPUs, use that bound as the limit instead.
                nodeCpuLimit = BigNumber.min(
                    nodeCpuLimit,
                    current.cgroupQuotaNs.dividedBy(current.cgroupPeriodNs)
                );
            }
        }
    } else {
        // If there are previous results, but not for this specific node, we
        // have an unknown CPU value.
        logging.log(
            "error",
            `Found no previous CPU usage value for node ${current.nodeAddress}.`
        );

        return new LEError();
    }

    if (nodeCpuLimit === undefined) {
        return new LESuccess(nodeCpuUsage.toNumber());
    } else {
        return new LESuccess(nodeCpuUsage.dividedBy(nodeCpuLimit).toNumber());
    }
}

function buildNodesLiveMonitoringAction({
    results: { sqlNodesLiveCPU, sqlNodesLiveMemory, sqlNodesLiveDisk },
    previousResults,
}: {
    results: LiveMonitoringQueryResults;
    previousResults: Maybe<LiveMonitoringQueryResults>;
}): NodesLiveMonitoringAction {
    const payload: NodesLiveMonitoringActionPayload = {};

    // This function updates the payload for a given node address.
    function updatePayload(
        nodeAddress: string,
        newPartialPayload: Partial<NodeLiveMonitoringMetrics>
    ) {
        payload[nodeAddress] = {
            ...payload[nodeAddress],
            ...newPartialPayload,
        };
    }

    // Add CPU Usage to each node.
    _.forEach(sqlNodesLiveCPU, current => {
        let totalCpuUsagePercent: LoadingError<number> = new LELoading();

        // If there are no previous results, we just let `totalCpuUsagePercent`
        // default to `undefined` (loading).
        if (previousResults) {
            const previous = _.find(
                previousResults.sqlNodesLiveCPU,
                row => row.nodeAddress === current.nodeAddress
            );

            totalCpuUsagePercent = calculateNodeCpuUsagePercent(
                previous,
                current
            );
        }

        updatePayload(current.nodeAddress, {
            totalCpuUsagePercent,
        });
    });

    // Add used and total system memory to each host.
    _.forEach(sqlNodesLiveMemory, sqlNodesLiveMemoryRow => {
        if (sqlNodesLiveMemoryRow.usedMemoryB === null) {
            throw new Error(
                "sqlNodesLiveMemoryRow.memoryLimitB should not be null"
            );
        }

        if (sqlNodesLiveMemoryRow.totalMemoryB === null) {
            throw new Error(
                "sqlNodesLiveMemoryRow.totalMemoryB should not be null"
            );
        }

        // If CGROUP_TOTAL_B in MV_SYSINFO_MEM is larger or equal than
        // HOST_TOTAL_B, this node's memory usage is not being limited by the
        // cgroup and so we just take the host's total memory and use that as
        // the total.
        let memoryLimitB: LoadingError<BigNumber> = new LELoading();
        if (sqlNodesLiveMemoryRow.cgroupMemoryLimitB === null) {
            memoryLimitB = new LESuccess(sqlNodesLiveMemoryRow.totalMemoryB);
        } else {
            memoryLimitB = new LESuccess(
                BigNumber.min(
                    sqlNodesLiveMemoryRow.cgroupMemoryLimitB,
                    sqlNodesLiveMemoryRow.totalMemoryB
                )
            );
        }

        updatePayload(sqlNodesLiveMemoryRow.nodeAddress, {
            memoryLimitB,
            usedMemoryB: new LESuccess(sqlNodesLiveMemoryRow.usedMemoryB),
        });
    });

    _.forEach(sqlNodesLiveDisk, sqlNodesLiveDiskRow => {
        if (sqlNodesLiveDiskRow.totalDiskB === null) {
            throw new Error(
                "sqlNodesLiveDiskRow.totalDiskB should not be null"
            );
        }

        if (sqlNodesLiveDiskRow.usedDiskB === null) {
            throw new Error("sqlNodesLiveDiskRow.usedDiskB should not be null");
        }

        updatePayload(sqlNodesLiveDiskRow.nodeAddress, {
            usedDiskB: new LESuccess(sqlNodesLiveDiskRow.usedDiskB),
            totalDiskB: new LESuccess(sqlNodesLiveDiskRow.totalDiskB),
        });
    });

    return {
        type: "NODES_LIVE_MONITORING",
        error: false,
        payload,
    };
}

const NODES_LIVE_MONITORING_SAGA_ID = "nodes-live-monitoring";
const NODES_LIVE_MONITORING_SAGA_INTERVAL_MS = 1000;

export const nodesLiveMonitoringStart = makeActionCreator({
    name: "nodesLiveMonitoringStart",

    handle(
        ctx: HandlerContext,
        args?: {
            ipAddress: string;
        }
    ): Observable<NodesLiveMonitoringAction> {
        const ipAddress = args && args.ipAddress;

        return ctx.sagas.register(NODES_LIVE_MONITORING_SAGA_ID, observer => {
            let running = true;
            let timeoutId: NodeJS.Timeout;

            const emit = (action: NodesLiveMonitoringAction) =>
                running && observer.next(action);

            let previousResults: Maybe<LiveMonitoringQueryResults>;

            const liveMonitor = () => {
                return ctx.manager
                    .getPooledConnection()
                    .then(conn => {
                        multiSelect<
                            SQLNodesLiveCPURow,
                            SQLNodesLiveMemoryRow,
                            SQLNodesLiveDiskRow
                        >(
                            conn,
                            {
                                sql: SQL_NODES_LIVE_CPU(ipAddress),
                            },
                            {
                                sql: SQL_NODES_LIVE_MEMORY(ipAddress),
                            },
                            {
                                sql: SQL_NODES_LIVE_DISK(ipAddress),
                            }
                        )
                            .then(results => {
                                const resultsObj = {
                                    sqlNodesLiveCPU: results[0],
                                    sqlNodesLiveMemory: results[1],
                                    sqlNodesLiveDisk: results[2],
                                };

                                emit(
                                    buildNodesLiveMonitoringAction({
                                        previousResults,
                                        results: resultsObj,
                                    })
                                );

                                previousResults = resultsObj;
                            })
                            .catch((err: MysqlError | Error) => {
                                logError(err);

                                emit({
                                    type: "NODES_LIVE_MONITORING",
                                    error: true,
                                    payload: err.message,
                                });
                            })
                            .finally(() => {
                                conn.release();
                            });
                    })
                    .catch((err: Error) => {
                        logError(err);

                        emit({
                            type: "NODES_LIVE_MONITORING",
                            error: true,
                            payload: err.message,
                        });
                    })
                    .finally(() => {
                        if (running) {
                            timeoutId = setTimeout(
                                liveMonitor,
                                NODES_LIVE_MONITORING_SAGA_INTERVAL_MS
                            );
                        }
                    });
            };

            liveMonitor();

            return () => {
                running = false;

                if (timeoutId) {
                    clearTimeout(timeoutId);
                }
            };
        });
    },
});

export const nodesLiveMonitoringStop = makeActionCreator({
    name: "nodesLiveMonitoringStop",

    handle(ctx: HandlerContext): Observable<NodesLiveMonitoringStopAction> {
        ctx.sagas.stop(NODES_LIVE_MONITORING_SAGA_ID);

        return Observable.of<NodesLiveMonitoringStopAction>({
            type: "NODES_LIVE_MONITORING_STOP",
            error: false,
            payload: {},
        });
    },
});
