import { Nullable } from "util/nullable";
import { HandlerContext } from "worker/api";
import {
    ActiveProcessesAction,
    ActiveProcessesEnabledAction,
    LogMessageAction,
} from "data/actions";
import { MysqlError } from "mysqljs";

import BigNumber from "vendor/bignumber.js/bignumber";
import _ from "lodash";

import { logFocusError } from "data/actions/log";

import { makeActionCreator } from "worker/api/helpers";
import { Observable } from "rxjs";
import { select } from "util/query";
import { nullableToMaybe } from "util/nullable";
import { logError } from "util/logging";
import { mapFilter } from "util/observable";

export const SQL_ACTIVE_PROCESSES_ENABLED = `
    SELECT
        COUNT(*) AS isActiveProcessesEnabled
    FROM
        INFORMATION_SCHEMA.TABLES
    WHERE
        TABLE_NAME = 'MV_PROCESSLIST';
`;

export type SQLActiveProcessesEnabled = {
    isActiveProcessesEnabled: BigNumber;
};

export const isActiveProcessesEnabled = makeActionCreator({
    name: "isActiveProcessesEnabled",

    handle(ctx: HandlerContext): Promise<ActiveProcessesEnabledAction> {
        return ctx.manager.getPooledConnection().then(conn =>
            select(conn, SQL_ACTIVE_PROCESSES_ENABLED)
                .then(
                    (rows): ActiveProcessesEnabledAction => {
                        if (rows.length !== 1) {
                            throw new Error(
                                "The Active Processes Enabled query should only return one row"
                            );
                        }

                        return {
                            type: "ACTIVE_PROCESSES_ENABLED",
                            error: false,
                            payload: rows[0].isActiveProcessesEnabled.gt(0),
                        };
                    }
                )
                .catch(
                    (err: MysqlError | Error): ActiveProcessesEnabledAction => {
                        logError(err);

                        return {
                            type: "ACTIVE_PROCESSES_ENABLED",
                            error: false,
                            payload: false,
                        };
                    }
                )
        );
    },
});

const SQL_ACTIVE_PROCESSES = `
    SELECT
        p.info AS queryText,
        p.id AS processId,
        n.id AS nodeId,
        p.time AS elapsedTime,
        TIMESTAMPADD(SECOND, -p.time, NOW()) AS submitted,
        p.state AS state,
        p.user AS user,
        n.ip_addr AS nodeHost,
        n.port AS nodePort,
        p.db AS databaseName
    FROM
        information_schema.mv_processlist p
        JOIN information_schema.mv_nodes n
        ON p.node_id = n.id
    WHERE
        p.command != "Sleep"
        AND n.type != "LEAF"
`;

type SQLActiveProcess = {
    queryText: Nullable<string>;
    processId: BigNumber;
    nodeId: BigNumber;
    elapsedTime: number;
    submitted: Date;
    state: Nullable<string>;
    user: string;
    nodeHost: string;
    nodePort: BigNumber;
    databaseName: Nullable<string>;
};

export const queryActiveProcesses = makeActionCreator({
    name: "queryActiveProcesses",

    handle(ctx: HandlerContext): Observable<ActiveProcessesAction> {
        const $compute = Observable.fromPromise(
            ctx.manager.getPooledConnection().then(conn =>
                select(conn, SQL_ACTIVE_PROCESSES)
                    .then(
                        (processes): ActiveProcessesAction => ({
                            type: "ACTIVE_PROCESSES",
                            error: false,
                            payload: {
                                loading: false,
                                data: _.map(
                                    processes,
                                    ({
                                        queryText,
                                        processId,
                                        nodeId,
                                        elapsedTime,
                                        submitted,
                                        state,
                                        user,
                                        nodeHost,
                                        nodePort,
                                        databaseName,
                                    }) => ({
                                        queryText: nullableToMaybe(queryText),
                                        processId,
                                        nodeId,
                                        elapsedTime,
                                        submitted,
                                        state: nullableToMaybe(state),
                                        user,
                                        nodeHost,
                                        nodePort,
                                        databaseName: nullableToMaybe(
                                            databaseName
                                        ),
                                    })
                                ),
                            },
                        })
                    )
                    .catch(
                        (err: Error | MysqlError): ActiveProcessesAction => {
                            logError(err);

                            return {
                                type: "ACTIVE_PROCESSES",
                                error: true,
                                payload: err.message,
                            };
                        }
                    )
                    .finally(() => conn.release())
            )
        );

        const $loading = Observable.of<ActiveProcessesAction>({
            type: "ACTIVE_PROCESSES",
            error: false,
            payload: {
                loading: true,
            },
        });

        return Observable.merge($loading, $compute);
    },
});

const SQL_KILL_QUERY_PROCESS_NODE = `KILL QUERY ? ?`;

export const killQuery = makeActionCreator({
    name: "killQuery",

    handle(
        ctx: HandlerContext,
        { nodeId, processId }: { nodeId: BigNumber; processId: BigNumber }
    ): Observable<LogMessageAction> {
        return Observable.fromPromise(
            ctx.manager.getPooledConnection().then(conn =>
                select(conn, SQL_KILL_QUERY_PROCESS_NODE, [
                    processId.toNumber(),
                    nodeId.toNumber(),
                ])
                    .then(() => undefined) // throw away the useless Rows that select returns, since it's not an action we want to dispatch
                    .catch((err: Error | MysqlError) => {
                        logError(err);

                        return logFocusError(
                            `An error occurred while canceling the query: ${
                                err.message
                            }`
                        );
                    })
                    .finally(() => conn.release())
            )
        ).pipe(mapFilter((v: LogMessageAction) => (v ? v : undefined)));
    },
});
