import { Maybe } from "util/maybe";

import { QueryRepr, StaticConnectionMsg } from "worker/net/static-connection";
import { Connection } from "mysqljs";

import { Observable } from "rxjs";
import EventEmitter from "eventemitter3";
import StaticConnection from "worker/net/static-connection";

import { select } from "util/query";

// This is emitted whenever this QueryExecutor changes state.
export type MsgQueryExecutorState = {
    kind: "queryExecutorState";
    state: QueryExecutorState;
    errorMsg?: string;
};

// Messages from the underlying StaticConnection about individual queries are
// wrapped, annotated as to whether they are the last in the query group, and
// emitted as this. Note: If a query group is cancelled or stopped before it
// reaches the last query of the group, the last messages of this type emitted
// by QueryExecutor before it goes to IDLE state may not have lastInGroup =
// true. However, whether lastInGroup is true is consistent across query
// messages caused by a single query.
export type MsgQueryMessage = {
    kind: "queryMessage";
    queryMsg: StaticConnectionMsg;
    lastInGroup: boolean;
    meta?: QueryGroupMeta;
};

// This is emitted right before running each query on the underlying
// StaticConnection.
export type MsgQueryRun = {
    kind: "queryRun";
    query: QueryRepr;
    meta?: QueryGroupMeta;
};

export type MsgSelectedDatabaseChanged = {
    kind: "selectedDatabaseChanged";
    database: Maybe<string>;
};

export type QueryExecutorMsg =
    | MsgQueryExecutorState
    | MsgQueryMessage
    | MsgQueryRun
    | MsgSelectedDatabaseChanged;

// A QueryExecutor can be in the following four states:
// IDLE         ready to run a group of queries
// INITIALIZING currently running queries to set up the connection;
//              these queries should not be canceled
// ACTIVE       currently running a group of queries submitted by a client;
//              this group of queries could be canceled midway
// PAUSED       midway through running a group of queries when a query failed;
//              waiting for continueQueryGroup or stopQueryGroup to be called
// TERMINATED   no longer valid due to the underlying StaticConnection also
//              becoming TERMINATED
export type QueryExecutorState =
    | "IDLE"
    | "INITIALIZING"
    | "ACTIVE"
    | "PAUSED"
    | "TERMINATED";

// There are three ways query executors can handle errors:
// - SKIP them and continue executing later queries;
// - STOP executing the entire query group, and become IDLE; or
// - PAUSE, enter the PAUSED state and wait for somebody to call either
//   continueQueryGroup or stopQueryGroup.
export type QueryExecutorOnErrorMode = "SKIP" | "STOP" | "PAUSE";

export type QueryGroupMeta =
    | {
          kind: "SHOW_PLAN";
          planType: "EXPLAIN" | "PROFILE";
          source: string;
      }
    | {
          kind: "SELECT_DATABASE";
      };

export type QueryGroupRepr = {
    queries: Array<QueryRepr>;
    onError: QueryExecutorOnErrorMode;

    // If this is true, when running a query group with at least two queries, we
    // set the select limit to 0 before the first query and reset it to the
    // default value before the last query, so that the limit is 0 while
    // executing all queries other than the last.
    limitAllButLast: boolean;

    // Whatever callers pass here will be attached back to MsgQueryMessages and
    // MsgQueryRuns without affecting the execution of the query on the worker
    // side. We use this to, for example, mark EXPLAIN and PROFILE query groups
    // that are meant for Visual Explain, so that the messages can be sent
    // through the layout algorithm and also ignored by the query editor
    // reducer that normally displays the results in the query executor table.
    meta?: QueryGroupMeta;
};

const SQL_KILL_QUERY = `KILL QUERY ?`;
const SQL_SET_SELECT_LIMIT = "SET SESSION `sql_select_limit` = ?";

export default class QueryExecutor extends EventEmitter {
    staticConnection: StaticConnection;
    defaultPerQueryRowLimit: Maybe<number>;

    state: QueryExecutorState;
    queryQueue: Array<QueryRepr>;
    onError: Maybe<QueryExecutorOnErrorMode>;
    queryErrored: boolean;

    // When we run a query to set a session variable on the underlying
    // StaticConnection, we will not emit a queryRun event and this variable
    // will be set to the query. It tells us not to wrap and emit the next
    // queryEnd if it's successful, since it's for a query that we chose to emit
    // ourselves in order to maintain the session variables, not a query that we
    // were told to run.  However, if the silenced query fails, we will emit
    // both the queryRun and queryEnd to be consistent.
    currentSilencedQuery: Maybe<QueryRepr>;

    // This flag is set if we set the row limit to 0, with the goal of limiting
    // all but the last query in the queue, and need to reset it to the default
    // before the last query in the queue.
    needsRowLimitReset: boolean;

    // This flag is set if the current query group is cancelled. If set, when
    // the StaticConnection next becomes idle, we will clear the queryQueue
    // instead of continuing to execute the queries in it. We don't want to
    // clear the queryQueue in the middle of a query executing on the underlying
    // StaticConnection, because that will cause the lastInGroup flags among our
    // emitted messages for the single query to be inconsistent.
    cancelled: boolean;

    currentMeta: Maybe<QueryGroupMeta>;

    constructor(
        staticConnection: StaticConnection,
        { perQueryRowLimit }: { perQueryRowLimit?: number }
    ) {
        super();
        this.staticConnection = staticConnection;
        staticConnection.on("message", this.handleMessage);
        this.queryQueue = [];
        this.queryErrored = false;
        this.currentSilencedQuery = undefined;
        this.defaultPerQueryRowLimit = perQueryRowLimit;
        this.needsRowLimitReset = false;
        this.cancelled = false;
        this.currentMeta = undefined;

        if (perQueryRowLimit === undefined) {
            this.state = "IDLE";
        } else {
            this.state = "INITIALIZING";
            // since we could call setRowLimit internally at the start or in
            // the middle of query groups, it should not be responsible for
            // emitting state
            this.setRowLimit(perQueryRowLimit);
        }
    }

    cancelUsing(connection: Connection): Promise<unknown> {
        this.cancelled = true;
        return select(connection, SQL_KILL_QUERY, [
            this.staticConnection.connection.threadId,
        ]);
    }

    close(): void {
        this.staticConnection.close();
    }

    setRowLimit(limit: Maybe<number>) {
        if (this.staticConnection.state !== "IDLE") {
            throw new Error(
                "Expected static connection to be idle when setting row limit"
            );
        }

        if (this.state !== "INITIALIZING" && this.state !== "ACTIVE") {
            throw new Error(
                "Expected setRowLimit to only be called internally while query executor is INITIALIZING or ACTIVE"
            );
        }

        const query = {
            text: SQL_SET_SELECT_LIMIT,
            values: [limit],
        };

        this.currentSilencedQuery = query;
        this.staticConnection.runQuery(query);
    }

    emitMessage(message: QueryExecutorMsg) {
        // simple wrapper that is typechecked
        this.emit("message", message);
    }

    emitState(state: QueryExecutorState, errorMsg?: string) {
        this.state = state;
        this.emitMessage({ kind: "queryExecutorState", state, errorMsg });
    }

    messageObservable(): Observable<QueryExecutorMsg> {
        const init$ = Observable.of<QueryExecutorMsg>({
            kind: "queryExecutorState",
            state: this.state,
        });
        const info$: Observable<QueryExecutorMsg> = Observable.fromEvent(
            this,
            "message"
        );
        return Observable.merge(init$, info$);
    }

    runNextQuery() {
        if (this.staticConnection.state !== "IDLE") {
            throw new Error(
                "Expected static connection to be idle when running query in group"
            );
        }

        if (this.queryQueue.length <= 0) {
            throw new Error("Expected query queue to be nonempty");
        }

        if (this.queryQueue.length === 1 && this.needsRowLimitReset) {
            this.needsRowLimitReset = false;
            this.setRowLimit(this.defaultPerQueryRowLimit);
        } else {
            const [query, ...rest] = this.queryQueue;
            this.queryQueue = rest;
            this.queryErrored = false;
            this.emitMessage({
                kind: "queryRun",
                query,
                meta: this.currentMeta,
            });
            this.staticConnection.runQuery(query);
        }
    }

    handleMessage = (msg: StaticConnectionMsg) => {
        let lastInGroup = this.queryQueue.length === 0;
        switch (msg.kind) {
            case "state":
                if (msg.state === "IDLE") {
                    if (
                        this.cancelled ||
                        (this.queryErrored && this.onError === "STOP")
                    ) {
                        this.cancelled = false;
                        this.queryQueue = [];
                        lastInGroup = true;
                    }

                    if (lastInGroup) {
                        // all done with this group
                        if (this.needsRowLimitReset) {
                            // we may need to reset the row limit if we ended up
                            // here due to getting canceled halfway through our
                            // queries
                            this.needsRowLimitReset = false;
                            this.setRowLimit(this.defaultPerQueryRowLimit);
                        } else {
                            // ready to run next group
                            this.emitState("IDLE");
                        }
                    } else if (this.queryErrored && this.onError === "PAUSE") {
                        // last query failed, ask user to decide what to do
                        this.emitState("PAUSED");
                    } else {
                        // whenever the static connection becomes IDLE and we
                        // have a next query in the current group to run, run it
                        this.runNextQuery();
                    }
                } else if (msg.state === "TERMINATED") {
                    // unrecoverable error
                    this.emitState("TERMINATED", msg.errorMsg);
                }
                break;

            case "resultSet":
            case "rows":
                if (this.currentSilencedQuery) {
                    throw new Error(
                        "Expected silent queries not to have resultSets or rows"
                    );
                }
                this.emitMessage({
                    kind: "queryMessage",
                    queryMsg: msg,
                    lastInGroup,
                    meta: this.currentMeta,
                });
                break;

            case "queryEnd": {
                const silencedQuery = this.currentSilencedQuery;
                if (silencedQuery !== undefined) {
                    this.currentSilencedQuery = undefined;

                    // We hope that silent queries don't fail. If they do fail,
                    // which should be rare, we bail out of the current queue of
                    // queries since we can't be sure what state we're in any
                    // more. Then, we try to report the error as informatively
                    // and as consistently with other errors as we can, which
                    // means supplying a queryRun message that was previously
                    // silenced.
                    if (msg.status.state === "error") {
                        this.queryQueue = [];
                        this.emitMessage({
                            kind: "queryRun",
                            query: silencedQuery,
                            meta: this.currentMeta,
                        });
                        this.emitMessage({
                            kind: "queryMessage",
                            queryMsg: msg,
                            lastInGroup: true,
                            meta: this.currentMeta,
                        });
                    }
                } else {
                    this.emitMessage({
                        kind: "queryMessage",
                        queryMsg: msg,
                        lastInGroup,
                        meta: this.currentMeta,
                    });
                }
                this.queryErrored = msg.status.state === "error";
                break;
            }

            case "selectedDatabaseChanged": {
                this.emitMessage({
                    kind: "selectedDatabaseChanged",
                    database: msg.database,
                });

                break;
            }
        }
    };

    runQueryGroup(group: QueryGroupRepr) {
        if (this.state !== "IDLE") {
            // can't run a query group while another one is in progress
            return;
        }

        const { queries, onError, limitAllButLast, meta } = group;
        if (queries.length === 0) {
            // nothing to do
            return;
        }

        this.emitState("ACTIVE");
        this.queryQueue = queries;
        this.onError = onError;
        this.queryErrored = false;
        this.cancelled = false;
        this.currentMeta = meta;

        if (limitAllButLast && queries.length > 1) {
            this.needsRowLimitReset = true;
            this.setRowLimit(0);
        } else {
            this.runNextQuery();
        }
    }

    continueQueryGroup(skipErrors: boolean) {
        if (this.state !== "PAUSED") {
            throw new Error("Expected state to be paused");
        }

        this.onError = skipErrors ? "SKIP" : "PAUSE";
        this.emitState("ACTIVE");
        this.runNextQuery();
    }

    stopQueryGroup() {
        if (this.state !== "PAUSED") {
            throw new Error("Expected state to be paused");
        }

        this.queryQueue = [];
        this.queryErrored = false;

        if (this.needsRowLimitReset) {
            this.needsRowLimitReset = false;
            this.emitState("ACTIVE");
            this.setRowLimit(this.defaultPerQueryRowLimit);
        } else {
            this.emitState("IDLE");
        }
    }
}

export const createSelectDatabaseQueryGroupRepr = (
    database: string
): QueryGroupRepr => ({
    queries: [
        {
            text: "USE ?",
            values: [database],
        },
    ],

    onError: "STOP",
    limitAllButLast: false,

    meta: {
        kind: "SELECT_DATABASE",
    },
});
