import { MysqlError } from "mysqljs";
import { Maybe } from "util/maybe";
import { Nullable } from "util/nullable";
import { HandlerContext } from "worker/api";
import { PipelinesAction } from "data/actions";

import {
    SQLPipelinesRow,
    SQLPipelinesLastBatchRow,
    SQLPipelinesFailedBatchesRow,
} from "worker/api/pipelines-queries";

import {
    Pipeline,
    PipelineState,
    PipelineSource,
    BatchState,
} from "data/models";

import { logError } from "util/logging";
import { makeActionCreator } from "worker/api/helpers";

import _ from "lodash";
import { Observable } from "rxjs";
import BigNumber from "vendor/bignumber.js/bignumber";

import { multiSelect } from "util/query";
import outerJoin from "util/joins/outer-join";
import leftJoin from "util/joins/left-join";
import { parse, differenceInSeconds } from "date-fns";
import { TextDecoder } from "text-encoding";
import { nullableToMaybe } from "util/nullable";

import {
    SQL_PIPELINES_LAST_BATCH_QUERY,
    SQL_PIPELINES_FAILED_BATCHES_QUERY,
    SQL_PIPELINES_QUERY,
} from "worker/api/pipelines-queries";

const FIELD_DECODER = new TextDecoder("utf-8");

const getPipelineState = (state: string): PipelineState => {
    switch (state) {
        case "Running":
        case "Error":
        case "Stopped":
            return state;

        default:
            return "Unknown";
    }
};

const getPipelineSourceType = (
    sourceType: Nullable<Uint8Array>
): PipelineSource => {
    if (sourceType === null) {
        return "UNKNOWN";
    }

    const sourceTypeStr = FIELD_DECODER.decode(sourceType);

    switch (sourceTypeStr) {
        case "S3":
        case "KAFKA":
        case "AZURE":
        case "FS":
        case "HDFS":
            return sourceTypeStr;

        default:
            return "UNKNOWN";
    }
};

const getBatchState = (batchState: Nullable<string>): BatchState => {
    switch (batchState) {
        case "Succeeded":
        case "Failed":
        case "Canceled":
            return batchState;

        default:
            return "Unknown";
    }
};

// This type represents the merging of SQLPipelinesFailedBatchesRow
// and SQLPipelinesLastBatchRow.
type PipelinesData = {
    pipelineId: string;

    numFailedBatches?: BigNumber;

    lastBatchState?: BatchState;
    lastBatchTimestamp?: string;
    lastBatchRowsWritten?: BigNumber;
};

const joinPipelinesData = _.curry(outerJoin)(
    (a: SQLPipelinesFailedBatchesRow, b: SQLPipelinesLastBatchRow) => {
        if (a.pipelineId < b.pipelineId) {
            return -1;
        } else if (a.pipelineId === b.pipelineId) {
            return 0;
        } else {
            return 1;
        }
    },
    (
        a: Maybe<SQLPipelinesFailedBatchesRow>,
        b: Maybe<SQLPipelinesLastBatchRow>
    ): Maybe<PipelinesData> => {
        if (a && b) {
            return {
                pipelineId: a.pipelineId,
                numFailedBatches: a.numFailedBatches,
                lastBatchState: getBatchState(b.lastBatchState),
                lastBatchTimestamp: nullableToMaybe(b.lastBatchTimestamp),
                lastBatchRowsWritten: b.lastBatchRowsWritten,
            };
        } else if (a) {
            return {
                pipelineId: a.pipelineId,
                numFailedBatches: a.numFailedBatches,
            };
        } else if (b) {
            return {
                pipelineId: b.pipelineId,
                lastBatchState: getBatchState(b.lastBatchState),
                lastBatchTimestamp: nullableToMaybe(b.lastBatchTimestamp),
                lastBatchRowsWritten: b.lastBatchRowsWritten,
            };
        }
    }
);

const joinPipelinesWithData = _.curry(leftJoin)(
    (a: SQLPipelinesRow, b: PipelinesData) => {
        if (a.pipelineId < b.pipelineId) {
            return -1;
        } else if (a.pipelineId === b.pipelineId) {
            return 0;
        } else {
            return 1;
        }
    },
    (a: SQLPipelinesRow, b: Maybe<PipelinesData>): Pipeline => {
        const basePipeline: Pipeline = {
            kind: "PIPELINE",
            pipelineId: a.pipelineId,
            pipelineName: a.pipelineName,
            databaseName: a.databaseName,
            state: getPipelineState(a.state),
            sourceType: getPipelineSourceType(a.sourceType),
            numFailedBatches: (b && b.numFailedBatches) || new BigNumber(0),
        };

        if (b && b.lastBatchState && b.lastBatchTimestamp) {
            const { lastBatchState, lastBatchTimestamp } = b;

            return {
                ...basePipeline,
                lastBatch: {
                    timestamp: parse(lastBatchTimestamp),
                    state: lastBatchState,
                    rowsWritten: b.lastBatchRowsWritten,
                },
            };
        }

        return basePipeline;
    }
);

const createPipelinesAction = (
    [sqlPipelines, sqlPipelinesFailedBatches, sqlPipelinesLastBatch]: [
        Array<SQLPipelinesRow>,
        Array<SQLPipelinesFailedBatchesRow>,
        Array<SQLPipelinesLastBatchRow>
    ],
    startDate: Date
): PipelinesAction => {
    // `pipelinesData` refers to the grouping between information
    // about pipelines' last batches and pipelines' number of failed
    // batches.
    const pipelinesData = joinPipelinesData(
        sqlPipelinesFailedBatches, // a
        sqlPipelinesLastBatch // b
    ) as Array<PipelinesData>;

    const pipelines = joinPipelinesWithData(
        sqlPipelines,
        pipelinesData
    ) as Array<Pipeline>;

    return {
        type: "PIPELINES",
        payload: {
            loading: false,
            data: {
                pipelines,
                deltaTimeS: differenceInSeconds(new Date(), startDate),
            },
        },
        error: false,
    };
};

export const queryPipelines = makeActionCreator({
    name: "queryPipelines",

    handle: (ctx: HandlerContext): Observable<PipelinesAction> => {
        const $loading = Observable.of<PipelinesAction>({
            type: "PIPELINES",
            error: false,
            payload: { loading: true },
        });

        const startDate = new Date();

        const $compute = Observable.fromPromise(
            ctx.manager
                .getPooledConnection()
                .then(conn =>
                    multiSelect<
                        SQLPipelinesRow,
                        SQLPipelinesFailedBatchesRow,
                        SQLPipelinesLastBatchRow
                    >(
                        conn,
                        { sql: SQL_PIPELINES_QUERY },
                        { sql: SQL_PIPELINES_FAILED_BATCHES_QUERY },
                        { sql: SQL_PIPELINES_LAST_BATCH_QUERY }
                    )
                        .then(pipelines =>
                            createPipelinesAction(pipelines, startDate)
                        )
                        .finally(() => conn.release())
                )
                .catch(
                    (err: Error | MysqlError): PipelinesAction => {
                        logError(err);

                        return {
                            type: "PIPELINES",
                            error: true,
                            payload: {
                                message: err.message,
                            },
                        };
                    }
                )
        );

        return Observable.merge($loading, $compute);
    },
});
