import { MysqlError } from "mysqljs";
import { Maybe } from "util/maybe";
import {
    SQLHostRow,
    SQLHostsLiveCPURow,
    SQLHostsLiveMemoryRow,
    SQLHostsLiveNetworkCumulativeRow,
    SQLNodesLiveDiskCumulativeRow,
    SQLHostsLiveDiskRow,
} from "worker/api/hosts-queries";
import { HandlerContext } from "worker/api";
import {
    HostsAction,
    HostsLiveMonitoringAction,
    HostsLiveMonitoringStopAction,
    HostsLiveMonitoringActionPayload,
    PhysicalMonitoringEnabledAction,
} from "data/actions";
import { Host, HostLiveMonitoringMetrics } from "data/models";
import { Connection } from "mysqljs";
import {
    LoadingError,
    updateLoadingError,
    LEError,
    LELoading,
    LESuccess,
} from "util/loading-error";

import _ from "lodash";
import { Observable } from "rxjs";

import { getLoadingHostLiveMonitoringMetrics } from "data/models";

import { logError } from "util/logging";
import { makeActionCreator } from "worker/api/helpers";
import { buildHostsAction } from "data/actions/hosts";
import { select, multiSelect } from "util/query";
import * as logging from "util/logging";

import {
    SQL_HOSTS_QUERY,
    SQL_HOSTS_LIVE_CPU,
    SQL_HOSTS_LIVE_MEMORY,
    SQL_HOSTS_LIVE_NETWORK_CUMULATIVE,
    SQL_HOSTS_LIVE_DISK,
    SQL_NODES_LIVE_DISK_CUMULATIVE,
    SQL_HOSTS_PHYSICAL_MONITORING_ENABLED,
} from "worker/api/hosts-queries";

const SECOND_IN_NANOSECONDS = 1000000000;

export function buildHostsPayload(rawHosts: Array<SQLHostRow>): Array<Host> {
    const hosts = _.map(
        rawHosts,
        (rawHost: SQLHostRow): Host => ({
            address: rawHost.address,
            cpuCount: rawHost.cpuCount === null ? undefined : rawHost.cpuCount,
            systemMemoryB:
                rawHost.systemMemoryMib === null
                    ? undefined
                    : rawHost.systemMemoryMib.multipliedBy(1024 * 1024),

            liveMonitoring: getLoadingHostLiveMonitoringMetrics(),
        })
    );

    return hosts;
}

export const fetchHostsData = (conn: Connection) => {
    return select(conn, SQL_HOSTS_QUERY);
};

export const queryHosts = makeActionCreator({
    name: "queryHosts",

    handle: (ctx: HandlerContext): Observable<HostsAction> => {
        const $loading = Observable.of<HostsAction>({
            type: "HOSTS",
            error: false,
            payload: { loading: true },
        });

        const $compute = Observable.fromPromise<HostsAction>(
            ctx.manager
                .getPooledConnection()
                .then(conn =>
                    fetchHostsData(conn)
                        .then(buildHostsPayload)
                        .then(hosts => buildHostsAction({ hosts }))
                        .finally(() => conn.release())
                )
                .catch(
                    (err: Error | MysqlError): HostsAction => {
                        logError(err);

                        return {
                            type: "HOSTS",
                            error: true,
                            payload: err.message,
                        };
                    }
                )
        );

        return Observable.merge($loading, $compute);
    },
});

type LiveMonitoringQueryResults = {
    sqlHostsLiveCPU: Array<SQLHostsLiveCPURow>;
    sqlHostsLiveMemory: Array<SQLHostsLiveMemoryRow>;
    sqlHostsLiveNetworkCumulative: Array<SQLHostsLiveNetworkCumulativeRow>;
    sqlHostsLiveDisk: Array<SQLHostsLiveDiskRow>;
    sqlNodesLiveDiskCumulative: Array<SQLNodesLiveDiskCumulativeRow>;
};

function buildHostsLiveMonitoringAction({
    results: {
        sqlHostsLiveCPU,
        sqlHostsLiveMemory,
        sqlHostsLiveDisk,
        sqlHostsLiveNetworkCumulative,
        sqlNodesLiveDiskCumulative,
    },
    previousResults,
}: {
    results: LiveMonitoringQueryResults;
    previousResults: Maybe<LiveMonitoringQueryResults>;
}): HostsLiveMonitoringAction {
    const payload: HostsLiveMonitoringActionPayload = {};

    // This function updates the payload for a given host IP address.
    function updatePayload(
        hostAddress: string,
        newPartialPayload: Partial<HostLiveMonitoringMetrics>
    ) {
        payload[hostAddress] = {
            ...payload[hostAddress],
            ...newPartialPayload,
        };
    }

    // Add CPU Usage to each host.
    _.forEach(sqlHostsLiveCPU, sqlHostsLiveCPURow => {
        let totalCpuUsagePercentRate: LoadingError<number> = new LELoading();
        if (previousResults) {
            const previousSqlHostsLiveCPURow = _.find(
                previousResults.sqlHostsLiveCPU,
                row => row.hostIpAddress === sqlHostsLiveCPURow.hostIpAddress
            );

            if (previousSqlHostsLiveCPURow) {
                if (sqlHostsLiveCPURow.numCpus === null) {
                    throw new Error(
                        "sqlNodesLiveCPURow.numCpus should not be null"
                    );
                }

                if (sqlHostsLiveCPURow.numCpus.isZero()) {
                    totalCpuUsagePercentRate = new LEError();
                } else {
                    totalCpuUsagePercentRate = new LESuccess(
                        sqlHostsLiveCPURow.totalUsedCumulativeNs
                            .minus(
                                previousSqlHostsLiveCPURow.totalUsedCumulativeNs
                            )
                            .dividedBy(
                                sqlHostsLiveCPURow.timestampNs.minus(
                                    previousSqlHostsLiveCPURow.timestampNs
                                )
                            )
                            .dividedBy(sqlHostsLiveCPURow.numCpus)
                            .toNumber()
                    );
                }
            } else {
                // If there are previous results, but not for this specific
                // host, we have an unknown CPU value.

                logging.log(
                    "error",
                    `Found no previous total CPU usage value for host ${
                        sqlHostsLiveCPURow.hostIpAddress
                    }.`
                );

                totalCpuUsagePercentRate = new LEError();
            }
        }

        updatePayload(sqlHostsLiveCPURow.hostIpAddress, {
            totalCpuUsagePercentRate,
        });
    });

    // Add used and total system memory to each host.
    _.forEach(sqlHostsLiveMemory, sqlHostsLiveMemoryRow => {
        updatePayload(sqlHostsLiveMemoryRow.hostIpAddress, {
            systemTotalMemoryB: new LESuccess(
                sqlHostsLiveMemoryRow.hostTotalMemoryB
            ),
            systemUsedMemoryB: new LESuccess(
                sqlHostsLiveMemoryRow.hostUsedMemoryB
            ),
        });
    });

    // Add network I/O values to each host.
    _.forEach(sqlHostsLiveNetworkCumulative, currentNetworkRow => {
        const { hostIpAddress } = currentNetworkRow;

        if (previousResults) {
            const previousNetworkRow = _.find(
                previousResults.sqlHostsLiveNetworkCumulative,
                row =>
                    row.hostIpAddress === hostIpAddress &&
                    row.interfaceName === currentNetworkRow.interfaceName
            );

            if (previousNetworkRow) {
                if (currentNetworkRow.receivedCumulativeB === null) {
                    throw new Error(
                        "currentNetworkRow.receivedCumulativeB should not be null"
                    );
                }

                if (previousNetworkRow.receivedCumulativeB === null) {
                    throw new Error(
                        "previousNetworkRow.receivedCumulativeB should not be null"
                    );
                }

                if (currentNetworkRow.timestampNs === null) {
                    throw new Error(
                        "currentNetworkRow.timestampNs should not be null"
                    );
                }

                if (previousNetworkRow.timestampNs === null) {
                    throw new Error(
                        "previousNetworkRow.timestampNs should not be null"
                    );
                }

                if (currentNetworkRow.transmittedCumulativeB === null) {
                    throw new Error(
                        "currentNetworkRow.transmittedCumulativeB should not be null"
                    );
                }

                if (previousNetworkRow.transmittedCumulativeB === null) {
                    throw new Error(
                        "previousNetworkRow.transmittedCumulativeB should not be null"
                    );
                }

                // Calculate rate and convert from bytes/ns to bytes/s
                const receivedRate = currentNetworkRow.receivedCumulativeB
                    .minus(previousNetworkRow.receivedCumulativeB)
                    .dividedBy(
                        currentNetworkRow.timestampNs.minus(
                            previousNetworkRow.timestampNs
                        )
                    )
                    .multipliedBy(SECOND_IN_NANOSECONDS)
                    .toNumber();

                // Calculate rate and convert from bytes/ns to bytes/s
                const transmittedRate = currentNetworkRow.transmittedCumulativeB
                    .minus(previousNetworkRow.transmittedCumulativeB)
                    .dividedBy(
                        currentNetworkRow.timestampNs.minus(
                            previousNetworkRow.timestampNs
                        )
                    )
                    .multipliedBy(SECOND_IN_NANOSECONDS)
                    .toNumber();

                let netReceivedRateBpS =
                    payload[hostIpAddress].netReceivedRateBpS;
                if (
                    netReceivedRateBpS === undefined ||
                    netReceivedRateBpS.isLoading()
                ) {
                    netReceivedRateBpS = new LESuccess(0);
                }

                netReceivedRateBpS = updateLoadingError(
                    netReceivedRateBpS,
                    currentNetReceivedRate => {
                        return new LESuccess(
                            currentNetReceivedRate + receivedRate
                        );
                    }
                );

                let netTransmittedRateBpS =
                    payload[hostIpAddress].netTransmittedRateBpS;
                if (
                    netTransmittedRateBpS === undefined ||
                    netTransmittedRateBpS.isLoading()
                ) {
                    netTransmittedRateBpS = new LESuccess(0);
                }

                netTransmittedRateBpS = updateLoadingError(
                    netTransmittedRateBpS,
                    currentNetTransmittedRate => {
                        return new LESuccess(
                            currentNetTransmittedRate + transmittedRate
                        );
                    }
                );

                updatePayload(hostIpAddress, {
                    netReceivedRateBpS,
                    netTransmittedRateBpS,
                });
            } else {
                // If there are previous results, but not for this specific
                // <host, network> pair, we have an unknown network I/O value.

                logging.log(
                    "error",
                    `Found no previous network value for host ${
                        currentNetworkRow.hostIpAddress
                    } on interface ${currentNetworkRow.interfaceName}.`
                );

                payload[hostIpAddress].netReceivedRateBpS = new LEError();
                payload[hostIpAddress].netTransmittedRateBpS = new LEError();
            }
        } else {
            payload[hostIpAddress].netReceivedRateBpS = new LELoading();
            payload[hostIpAddress].netTransmittedRateBpS = new LELoading();
        }
    });

    // Add used and total system disk to each host.
    _.forEach(sqlHostsLiveDisk, hostDiskInfo => {
        if (hostDiskInfo.totalDiskB === null) {
            throw new Error("hostDiskInfo.totalDiskB should not be null");
        }

        if (hostDiskInfo.usedDiskB === null) {
            throw new Error("hostDiskInfo.usedDiskB should not be null");
        }

        updatePayload(hostDiskInfo.hostIpAddress, {
            systemTotalDiskB: new LESuccess(hostDiskInfo.totalDiskB),
            systemUsedDiskB: new LESuccess(hostDiskInfo.usedDiskB),
        });
    });

    // Add disk I/O rates to each host. The READ_CUMULATIVE_B and
    // WRITE_CUMULATIVE_B columns in MV_SYSINFO_DISK corresponds to how many
    // bytes have been read from disk across all mount points (not just the one
    // specified as MOUNT_POINT) per process (per MemSQL node). So, we simply
    // group by node and ignore mount points altogether.
    _.forEach(sqlNodesLiveDiskCumulative, currentDiskCumulativeRow => {
        const { hostIpAddress, port } = currentDiskCumulativeRow;

        if (previousResults) {
            const previousDiskCumulativeRow = _.find(
                previousResults.sqlNodesLiveDiskCumulative,
                row => row.hostIpAddress === hostIpAddress && row.port.eq(port)
            );

            if (previousDiskCumulativeRow) {
                if (currentDiskCumulativeRow.readCumulativeB === null) {
                    throw new Error(
                        "currentDiskCumulativeRow.readCumulativeB should not be null"
                    );
                }

                if (currentDiskCumulativeRow.writeCumulativeB === null) {
                    throw new Error(
                        "currentDiskCumulativeRow.writeCumulativeB should not be null"
                    );
                }

                if (previousDiskCumulativeRow.readCumulativeB === null) {
                    throw new Error(
                        "previousDiskCumulativeRow.readCumulativeB should not be null"
                    );
                }

                if (previousDiskCumulativeRow.writeCumulativeB === null) {
                    throw new Error(
                        "previousDiskCumulativeRow.writeCumulativeB should not be null"
                    );
                }

                if (currentDiskCumulativeRow.timestampNs === null) {
                    throw new Error(
                        "currentDiskCumulativeRow.timestampNs should not be null"
                    );
                }

                if (previousDiskCumulativeRow.timestampNs === null) {
                    throw new Error(
                        "previousDiskCumulativeRow.timestampNs should not be null"
                    );
                }

                // Calculate rate and convert from bytes/ns to bytes/s
                const readRate = currentDiskCumulativeRow.readCumulativeB
                    .minus(previousDiskCumulativeRow.readCumulativeB)
                    .dividedBy(
                        currentDiskCumulativeRow.timestampNs.minus(
                            previousDiskCumulativeRow.timestampNs
                        )
                    )
                    .multipliedBy(SECOND_IN_NANOSECONDS)
                    .toNumber();

                // Calculate rate and convert from bytes/ns to bytes/s
                const writeRate = currentDiskCumulativeRow.writeCumulativeB
                    .minus(previousDiskCumulativeRow.writeCumulativeB)
                    .dividedBy(
                        currentDiskCumulativeRow.timestampNs.minus(
                            previousDiskCumulativeRow.timestampNs
                        )
                    )
                    .multipliedBy(SECOND_IN_NANOSECONDS)
                    .toNumber();

                let diskReadRateBpS = payload[hostIpAddress].diskReadRateBpS;
                if (
                    diskReadRateBpS === undefined ||
                    diskReadRateBpS.isLoading()
                ) {
                    diskReadRateBpS = new LESuccess(0);
                }

                diskReadRateBpS = updateLoadingError(
                    diskReadRateBpS,
                    currentDiskReadRate => {
                        return new LESuccess(currentDiskReadRate + readRate);
                    }
                );

                let diskWriteRateBpS = payload[hostIpAddress].diskWriteRateBpS;
                if (
                    diskWriteRateBpS === undefined ||
                    diskWriteRateBpS.isLoading()
                ) {
                    diskWriteRateBpS = new LESuccess(0);
                }

                diskWriteRateBpS = updateLoadingError(
                    diskWriteRateBpS,
                    currentDiskWriteRate => {
                        return new LESuccess(currentDiskWriteRate + writeRate);
                    }
                );

                updatePayload(hostIpAddress, {
                    diskReadRateBpS,
                    diskWriteRateBpS,
                });
            } else {
                // If there are previous results, but not for this specific
                // <host, port> pair, we have an unknown disk I/O
                // value.
                logging.log(
                    "error",
                    `Found no previous cumulative disk usage for host ${
                        currentDiskCumulativeRow.hostIpAddress
                    } on port ${currentDiskCumulativeRow.port}.`
                );

                payload[hostIpAddress].diskReadRateBpS = new LEError();
                payload[hostIpAddress].diskWriteRateBpS = new LEError();
            }
        } else {
            payload[hostIpAddress].diskReadRateBpS = new LELoading();
            payload[hostIpAddress].diskWriteRateBpS = new LELoading();
        }
    });

    return {
        type: "HOSTS_LIVE_MONITORING",
        error: false,
        payload,
    };
}

const HOSTS_LIVE_MONITORING_SAGA_ID = "hosts-live-monitoring";
const HOSTS_LIVE_MONITORING_SAGA_INTERVAL_MS = 1000;

export const hostsLiveMonitoringStart = makeActionCreator({
    name: "hostsLiveMonitoringStart",

    handle(ctx: HandlerContext): Observable<HostsLiveMonitoringAction> {
        return ctx.sagas.register(HOSTS_LIVE_MONITORING_SAGA_ID, observer => {
            let running = true;
            let timeoutId: NodeJS.Timeout;

            const emit = (action: HostsLiveMonitoringAction) =>
                running && observer.next(action);

            let previousResults: Maybe<LiveMonitoringQueryResults>;

            const liveMonitor = () => {
                return ctx.manager
                    .getPooledConnection()
                    .then(conn => {
                        multiSelect<
                            SQLHostsLiveCPURow,
                            SQLHostsLiveMemoryRow,
                            SQLHostsLiveNetworkCumulativeRow,
                            SQLHostsLiveDiskRow,
                            SQLNodesLiveDiskCumulativeRow
                        >(
                            conn,
                            {
                                sql: SQL_HOSTS_LIVE_CPU,
                            },
                            {
                                sql: SQL_HOSTS_LIVE_MEMORY,
                            },
                            {
                                sql: SQL_HOSTS_LIVE_NETWORK_CUMULATIVE,
                            },
                            {
                                sql: SQL_HOSTS_LIVE_DISK,
                            },
                            {
                                sql: SQL_NODES_LIVE_DISK_CUMULATIVE,
                            }
                        )
                            .then(results => {
                                const resultsObj = {
                                    sqlHostsLiveCPU: results[0],
                                    sqlHostsLiveMemory: results[1],
                                    sqlHostsLiveNetworkCumulative: results[2],
                                    sqlHostsLiveDisk: results[3],
                                    sqlNodesLiveDiskCumulative: results[4],
                                };

                                emit(
                                    buildHostsLiveMonitoringAction({
                                        previousResults,
                                        results: resultsObj,
                                    })
                                );

                                previousResults = resultsObj;
                            })
                            .catch((err: MysqlError | Error) => {
                                logError(err);

                                emit({
                                    type: "HOSTS_LIVE_MONITORING",
                                    error: true,
                                    payload: err.message,
                                });
                            })
                            .finally(() => {
                                conn.release();
                            });
                    })
                    .catch((err: Error) => {
                        logError(err);

                        emit({
                            type: "HOSTS_LIVE_MONITORING",
                            error: true,
                            payload: err.message,
                        });
                    })
                    .finally(() => {
                        if (running) {
                            timeoutId = setTimeout(
                                liveMonitor,
                                HOSTS_LIVE_MONITORING_SAGA_INTERVAL_MS
                            );
                        }
                    });
            };

            liveMonitor();

            return () => {
                running = false;

                if (timeoutId) {
                    clearTimeout(timeoutId);
                }
            };
        });
    },
});

export const hostsLiveMonitoringStop = makeActionCreator({
    name: "hostsLiveMonitoringStop",

    handle(ctx: HandlerContext): Observable<HostsLiveMonitoringStopAction> {
        ctx.sagas.stop(HOSTS_LIVE_MONITORING_SAGA_ID);

        return Observable.of<HostsLiveMonitoringStopAction>({
            type: "HOSTS_LIVE_MONITORING_STOP",
            error: false,
            payload: {},
        });
    },
});

export const isPhysicalMonitoringEnabled = makeActionCreator({
    name: "isPhysicalMonitoringEnabled",

    handle(ctx: HandlerContext): Promise<PhysicalMonitoringEnabledAction> {
        return ctx.manager.getPooledConnection().then(conn =>
            select(conn, SQL_HOSTS_PHYSICAL_MONITORING_ENABLED)
                .then(
                    (rows): PhysicalMonitoringEnabledAction => {
                        if (rows.length !== 1) {
                            throw new Error(
                                "The Physical Monitoring Enabled query should only return one row"
                            );
                        }

                        return {
                            type: "PHYSICAL_MONITORING_ENABLED",
                            error: false,
                            payload: rows[0].isPhysicalMonitoringEnabled.eq(1),
                        };
                    }
                )
                .catch(
                    (
                        err: MysqlError | Error
                    ): PhysicalMonitoringEnabledAction => {
                        logError(err);

                        return {
                            type: "PHYSICAL_MONITORING_ENABLED",
                            error: false,
                            payload: false,
                        };
                    }
                )
                .finally(() => conn.release())
        );
    },
});
