import { Maybe } from "util/maybe";

import {
    ClusterConnectConfig,
    StaticConnectionId,
} from "worker/net/connection-manager";

import { QueryGroupRepr, QueryExecutorMsg } from "worker/net/query-executor";

import {
    ConnectAction,
    MemsqlPingAction,
    MemsqlPingStopAction,
    CloseQueryExecutorAction,
    InitQueryExecutorAction,
    QueryGroupStartAction,
    QueryExecutorMessageAction,
    QueryGroupStopAction,
    QueryGroupContinueAction,
    QueryExecutorCancelAction,
    LogMessageAction,
    ExplainLayoutAction,
} from "data/actions";

import QueryExecutor from "worker/net/query-executor";

import { MysqlError } from "mysqljs";
import { HandlerContext } from "worker/api";

import { makeActionCreator } from "worker/api/helpers";
import {
    logInfo,
    logSilentError,
    logFocusError,
    logFocusConnectionError,
} from "data/actions/log";
import { queryExecutorMessageAction } from "data/actions/connection";

import { messageToExplainLayoutAction } from "worker/api/explain";

import { Observable } from "rxjs";
import { catchError, mergeAll } from "rxjs/operators";

import { mapFilter } from "util/observable";

import NumberFormatter from "util/number-formatter";
import { plural } from "util/string";
import { logError } from "util/logging";
import SqlString from "sqlstring";

export const initConnection = makeActionCreator({
    name: "initConnection",

    handle(
        ctx: HandlerContext,
        config: ClusterConnectConfig
    ): Promise<ConnectAction> {
        return ctx.manager
            .connect(config)
            .then(
                (): ConnectAction => ({
                    type: "CONNECT",
                    error: false,
                    payload: ctx.manager.state(),
                })
            )
            .catch(
                (err: MysqlError | Error): ConnectAction => {
                    logError(err);

                    return {
                        type: "CONNECT",
                        error: true,
                        payload:
                            "code" in err
                                ? {
                                      code: err.code,
                                      message: err.toString(),
                                  }
                                : { message: err.toString() },
                    };
                }
            );
    },
});

const MEMSQL_PING_SAGA_ID = "memsql-ping";
const MEMSQL_PING_INTERVAL_MS = 5000;

export const memsqlPing = makeActionCreator({
    name: "memsqlPing",

    handle(
        ctx: HandlerContext
    ): Observable<MemsqlPingAction | LogMessageAction> {
        return ctx.sagas.register(MEMSQL_PING_SAGA_ID, observer => {
            let running = true;
            let timeoutId: NodeJS.Timeout;
            let lastError = false;

            const emit = (action: MemsqlPingAction | LogMessageAction) =>
                running && observer.next(action);

            const type = "MEMSQL_PING";

            const ping = () => {
                return (
                    ctx.manager
                        .getPooledConnection()
                        // collapse all cases into a resolved promise with either an
                        // Error or undefined for success:
                        .then(
                            conn =>
                                new Promise<Maybe<Error>>(resolve => {
                                    conn.ping((err: Maybe<Error>) => {
                                        conn.release();
                                        resolve(err);
                                    });
                                })
                        )
                        .catch(err => err)
                        .then((err: Maybe<Error>) => {
                            if (err) {
                                emit({
                                    type,
                                    error: true,
                                    payload: { message: err.toString() },
                                });

                                const msg = `SQL ping failed: ${err.toString()}`;
                                // We want to focus the Connection tab on a ping
                                // error to alert the user that something is
                                // wrong, but only on the first error, if the
                                // ping last succeeded (otherwise, once the ping
                                // starts failing, we'd focus the tab every five
                                // seconds and the user wouldn't be able to
                                // close the bottom panel or check other tabs.)
                                if (lastError) {
                                    emit(logSilentError(msg));
                                } else {
                                    emit(logFocusConnectionError(msg));
                                }

                                lastError = true;
                            } else {
                                emit({
                                    type,
                                    error: false,
                                    payload: {},
                                });

                                lastError = false;
                            }
                        })
                        .finally(() => {
                            if (running) {
                                timeoutId = setTimeout(
                                    ping,
                                    MEMSQL_PING_INTERVAL_MS
                                );
                            }
                        })
                );
            };

            ping();

            return () => {
                running = false;

                if (timeoutId) {
                    clearTimeout(timeoutId);
                }
            };
        });
    },
});

export const memsqlPingStop = makeActionCreator({
    name: "memsqlPingStop",

    handle(ctx: HandlerContext): Observable<MemsqlPingStopAction> {
        ctx.sagas.stop(MEMSQL_PING_SAGA_ID);

        return Observable.of<MemsqlPingStopAction>({
            type: "MEMSQL_PING_STOP",
            error: false,
            payload: {},
        });
    },
});

const QUERY_LOGGERS = {
    errorsOnly: (msg: QueryExecutorMsg): Maybe<LogMessageAction> => {
        if (msg.kind === "queryMessage") {
            if (msg.queryMsg.kind === "queryEnd") {
                const { status } = msg.queryMsg;

                if (status.state === "error") {
                    return logSilentError(status.message);
                }
            }
        }
    },

    verbose: (msg: QueryExecutorMsg): Maybe<LogMessageAction> => {
        if (msg.kind === "queryRun") {
            return logInfo(
                `Running: ${SqlString.format(msg.query.text, msg.query.values)}`
            );
        } else if (msg.kind === "queryMessage") {
            if (msg.queryMsg.kind === "queryEnd") {
                const { status, metrics } = msg.queryMsg;

                if (status.state === "error") {
                    return logSilentError(status.message);
                } else {
                    if (!metrics.endTimeUnix) {
                        throw new Error("Expected end time to be set");
                    }

                    const duration = NumberFormatter.formatDuration(
                        metrics.endTimeUnix - metrics.startTimeUnix
                    );
                    const count = metrics.affectedRows;

                    if (count === undefined) {
                        return logInfo(`Query OK (${duration})`);
                    } else {
                        return logInfo(
                            `Query OK, ${count} ${plural(
                                "row",
                                count
                            )} affected (${duration})`
                        );
                    }
                }
            }
        }
    },
};

/*
    Instantiates a query executor over a static connection to MemSQL.

    Takes an optional `perQueryRowLimit` argument which will set
    the `SQL_SELECT_LIMIT` session variable in MemSQL.
*/
export const initQueryExecutor = makeActionCreator({
    name: "initQueryExecutor",

    handle(
        ctx: HandlerContext,
        {
            id,
            perQueryRowLimit,
            loggerType,
            database,
        }: {
            id: StaticConnectionId;
            perQueryRowLimit?: number;
            database?: string;
            loggerType: keyof typeof QUERY_LOGGERS;
        }
    ): Observable<
        | InitQueryExecutorAction
        | QueryExecutorMessageAction
        | QueryGroupStartAction
        | LogMessageAction
        | ExplainLayoutAction
    > {
        const executor = ctx.manager.getQueryExecutor(id);

        if (executor) {
            // already initialized, ignore
            return Observable.empty();
        }

        const init: InitQueryExecutorAction = {
            type: "INIT_QUERY_EXECUTOR",
            error: false,
            payload: undefined,
            meta: { id },
        };

        return Observable.merge<
            | InitQueryExecutorAction
            | QueryExecutorMessageAction
            | QueryGroupStartAction
            | LogMessageAction
            | ExplainLayoutAction
        >(
            Observable.of(init),
            Observable.from(
                ctx.manager
                    .createQueryExecutor(id, { perQueryRowLimit, database })
                    .then(
                        (
                            executor: QueryExecutor
                        ): Observable<
                            | InitQueryExecutorAction
                            | LogMessageAction
                            | QueryExecutorMessageAction
                            | ExplainLayoutAction
                        > => {
                            const $emitter = executor.messageObservable();

                            const $msgs = $emitter.map(msg =>
                                queryExecutorMessageAction({ msg, id })
                            );

                            const $layoutMsgs: Observable<
                                ExplainLayoutAction
                            > = $emitter.pipe(
                                mapFilter(messageToExplainLayoutAction)
                            );

                            const $logs = $emitter.pipe(
                                mapFilter(QUERY_LOGGERS[loggerType])
                            );

                            const $connectedMsg = Observable.of(
                                logInfo(`Initialized the ${id} query executor.`)
                            );

                            const $merged = Observable.merge<
                                | InitQueryExecutorAction
                                | LogMessageAction
                                | QueryExecutorMessageAction
                                | ExplainLayoutAction
                            >($msgs, $layoutMsgs, $logs, $connectedMsg);

                            return $merged.pipe(
                                catchError((error: Error | MysqlError) => {
                                    logError(error);

                                    return Observable.of(
                                        logFocusError(
                                            `Error in SQL connection: ${
                                                error.message
                                            }`
                                        )
                                    );
                                })
                            );
                        },
                        (error: Error) => {
                            const act: InitQueryExecutorAction = {
                                type: "INIT_QUERY_EXECUTOR",
                                error: true,
                                payload: error,
                                meta: { id },
                            };

                            return Observable.of(act);
                        }
                    )
            ).pipe(mergeAll())
        );
    },
});

export const closeQueryExecutor = makeActionCreator({
    name: "closeQueryExecutor",

    handle(
        ctx: HandlerContext,
        { id }: { id: StaticConnectionId }
    ): Promise<CloseQueryExecutorAction> {
        const executor = ctx.manager.getQueryExecutor(id);
        if (executor) {
            executor.close();
        }
        return Promise.resolve<CloseQueryExecutorAction>({
            type: "QUERY_EXECUTOR_CLOSE",
            error: false,
            payload: {},
        });
    },
});

export const cancelQuery = makeActionCreator({
    name: "cancelQuery",
    handle(
        ctx: HandlerContext,
        { id }: { id: StaticConnectionId }
    ): Observable<QueryExecutorCancelAction | LogMessageAction> {
        const executor = ctx.manager.getQueryExecutor(id);
        if (!executor) {
            throw new Error(
                `Must initialize query executor ${id} before usage`
            );
        }

        const $canceling = Observable.of<QueryExecutorCancelAction>({
            type: "QUERY_EXECUTOR_CANCEL",
            error: false,
            payload: { id },
        });

        const $errorLogs = Observable.fromPromise(
            ctx.manager.getPooledConnection().then(conn => {
                return executor
                    .cancelUsing(conn)
                    .then(() => {
                        conn.release();
                    })
                    .catch((err: Error | MysqlError) => {
                        logError(err);

                        conn.release();

                        return logFocusError(
                            `An error occurred while canceling the query: ${
                                err.message
                            }`
                        );
                    });
            })
            // Filter out "undefined" values from when the Promise above does *not* return.
        ).pipe(mapFilter((v: LogMessageAction) => (v ? v : undefined)));

        // We immediately return QUERY_EXECUTOR_CANCEL to the UI
        // so that the "Canceling" state is immediately activated, even
        // if the actual "KILL QUERY" command takes a little bit to
        // execute (or fails).
        return Observable.merge($canceling, $errorLogs);
    },
});

export const staticQueryGroup = makeActionCreator({
    name: "staticQueryGroup",
    handle(
        ctx: HandlerContext,
        {
            id,
            queryGroup,
        }: { id: StaticConnectionId; queryGroup: QueryGroupRepr }
    ): Promise<QueryGroupStartAction> {
        const executor = ctx.manager.getQueryExecutor(id);
        if (!executor) {
            throw new Error(
                `Must initialize query group executor ${id} before usage`
            );
        }
        executor.runQueryGroup(queryGroup);

        return Promise.resolve<QueryGroupStartAction>({
            type: "QUERY_GROUP_START",
            error: false,
            payload: { id, queryGroup },
        });
    },
});

export const stopQueryGroup = makeActionCreator({
    name: "stopQueryGroup",
    handle(
        ctx: HandlerContext,
        { id }: { id: StaticConnectionId }
    ): Promise<QueryGroupStopAction> {
        const executor = ctx.manager.getQueryExecutor(id);
        if (!executor) {
            throw new Error(
                `Must initialize query group executor ${id} before usage`
            );
        }
        executor.stopQueryGroup();

        return Promise.resolve<QueryGroupStopAction>({
            type: "QUERY_GROUP_STOP",
            error: false,
            payload: { id },
        });
    },
});

export const continueQueryGroup = makeActionCreator({
    name: "continueQueryGroup",
    handle(
        ctx: HandlerContext,
        { id, skip }: { id: StaticConnectionId; skip: boolean }
    ): Promise<QueryGroupContinueAction> {
        const executor = ctx.manager.getQueryExecutor(id);
        if (!executor) {
            throw new Error(
                `Must initialize query group executor ${id} before usage`
            );
        }
        executor.continueQueryGroup(skip);

        return Promise.resolve<QueryGroupContinueAction>({
            type: "QUERY_GROUP_CONTINUE",
            error: false,
            payload: { id, skip },
        });
    },
});
