import { MysqlError } from "mysqljs";
import { Maybe } from "util/maybe";
import { Nullable } from "util/nullable";
import { LooseObject } from "util/loose-object";
import {
    MvNodeId,
    MvNode,
    MvActivityType,
    MvActivityName,
    MvActivityLowLevel,
    MvActivityLowLevelCumulative,
    MvStatistics,
    MvCumulativeStatistics,
    MvActivityID,
} from "data/models";

import { MvFullStartAction, MvFullAction, MvFullPayload } from "data/actions";
import { HandlerContext } from "worker/api";

import _ from "lodash";
import { Observable } from "rxjs";
import BigNumber from "vendor/bignumber.js/bignumber";

import { makeActionCreator } from "worker/api/helpers";
import { select, multiSelect } from "util/query";
import monus from "util/monus";
import differenceInSeconds from "date-fns/difference_in_seconds";
import * as logging from "util/logging";
import { nullableToMaybe } from "util/nullable";

import { mvFull } from "data/actions/management-views";

const SQL_GET_READ_ADVANCED_COUNTERS = `
    SHOW VARIABLES LIKE 'read_advanced_counters'
`;

type SQLAdvancedCounter = { Value: "ON" | "OFF" };

const SQL_MV_NODES = `
    SELECT
        ID AS nodeId,
        IP_ADDR AS ipAddr,
        PORT AS port,
        (CASE
            WHEN TYPE='MA' THEN 'MASTER_AGGREGATOR'
            WHEN TYPE='CA' THEN 'AGGREGATOR'
            WHEN TYPE='LEAF' THEN 'LEAF'
        END) AS role,
        STATE AS state,
        AVAILABILITY_GROUP AS availabilityGroup,
        NUM_CPUS AS numCpus,
        MAX_MEMORY_MB AS maxMemoryMb,
        MEMORY_USED_MB AS memoryUsedMb
    FROM information_schema.mv_nodes
`;

type SQLMvNode = {
    nodeId: BigNumber;
    ipAddr: string;
    port: BigNumber;
    role: string;
    state: string;
    availabilityGroup: Nullable<BigNumber>;
    numCpus: Nullable<BigNumber>;
    maxMemoryMb: Nullable<BigNumber>;
    memoryUsedMb: Nullable<BigNumber>;
};

const SQL_MV_ACTIVITIES_CUMULATIVE = `
    SELECT
        a.NODE_ID AS nodeId,
        a.ACTIVITY_TYPE AS activityType,
        a.ACTIVITY_NAME AS activityName,
        a.AGGREGATOR_ACTIVITY_NAME AS aggregatorActivityName,
        a.DATABASE_NAME AS databaseName,
        a.PARTITION_ID AS partitionId,
        q.QUERY_TEXT AS queryText,

        IFNULL(a.RUN_COUNT, 0) AS runCount,
        IFNULL(a.SUCCESS_COUNT, 0) AS successCount,
        IFNULL(a.FAILURE_COUNT, 0) AS failureCount,

        IFNULL(a.CPU_TIME_MS, 0) AS cpuTimeMs,
        IFNULL(a.CPU_WAIT_TIME_MS, 0) AS cpuWaitTimeMs,
        IFNULL(a.ELAPSED_TIME_MS, 0) AS elapsedTimeMs,

        IFNULL(a.MEMORY_BS, 0) AS memoryBs,
        IFNULL(a.MEMORY_MAJOR_FAULTS, 0) AS memoryMajorFaults,

        IFNULL(a.LOCK_TIME_MS, 0) AS lockTimeMs,
        IFNULL(a.LOCK_ROW_TIME_MS, 0) AS lockRowTimeMs,

        IFNULL(a.LOG_BUFFER_TIME_MS, 0) AS logBufferTimeMs,
        IFNULL(a.LOG_FLUSH_TIME_MS, 0) AS logFlushTimeMs,
        IFNULL(a.LOG_BUFFER_WRITE_B, 0) AS logBufferWriteB,

        ( IFNULL(a.NETWORK_LOGICAL_RECV_B, 0) +
        IFNULL(a.NETWORK_LOGICAL_SEND_B, 0)
        ) AS networkB,
        IFNULL(a.NETWORK_TIME_MS, 0) AS networkTimeMs,
        IFNULL(a.NETWORK_LOGICAL_RECV_B, 0) AS networkLogicalRecvB,
        IFNULL(a.NETWORK_LOGICAL_SEND_B, 0) AS networkLogicalSendB,

        ( IFNULL(a.DISK_LOGICAL_READ_B, 0) +
        IFNULL(a.DISK_LOGICAL_WRITE_B, 0) +
        IFNULL(a.LOG_BUFFER_WRITE_B, 0)
        ) AS diskB,
        IFNULL(a.DISK_TIME_MS, 0) AS diskTimeMs,
        IFNULL(a.DISK_LOGICAL_READ_B, 0) AS diskLogicalReadB,
        IFNULL(a.DISK_LOGICAL_WRITE_B, 0) AS diskLogicalWriteB,
        IFNULL(a.DISK_PHYSICAL_READ_B, 0) AS diskPhysicalReadB,
        IFNULL(a.DISK_PHYSICAL_WRITE_B, 0) AS diskPhysicalWriteB
    FROM
        information_schema.mv_activities_extended_cumulative a
        LEFT JOIN information_schema.mv_queries q
        ON a.ACTIVITY_NAME = q.ACTIVITY_NAME
`;

const SQL_MV_ACTIVITIES = `
    SELECT
        a.NODE_ID AS nodeId,
        a.ACTIVITY_TYPE AS activityType,
        a.ACTIVITY_NAME AS activityName,
        a.AGGREGATOR_ACTIVITY_NAME AS aggregatorActivityName,
        a.DATABASE_NAME AS databaseName,
        a.PARTITION_ID AS partitionId,
        q.QUERY_TEXT AS queryText,

        IFNULL(a.RUN_COUNT, 0) AS runCount,
        IFNULL(a.SUCCESS_COUNT, 0) AS successCount,
        IFNULL(a.FAILURE_COUNT, 0) AS failureCount,

        IFNULL(a.CPU_TIME_MS, 0) AS cpuTimeMs,
        IFNULL(a.CPU_WAIT_TIME_MS, 0) AS cpuWaitTimeMs,
        IFNULL(a.ELAPSED_TIME_MS, 0) AS elapsedTimeMs,

        IFNULL(a.MEMORY_BS, 0) AS memoryBs,
        IFNULL(a.MEMORY_MAJOR_FAULTS, 0) AS memoryMajorFaults,

        IFNULL(a.LOCK_TIME_MS, 0) AS lockTimeMs,
        IFNULL(a.LOCK_ROW_TIME_MS, 0) AS lockRowTimeMs,

        IFNULL(a.LOG_BUFFER_TIME_MS, 0) AS logBufferTimeMs,
        IFNULL(a.LOG_FLUSH_TIME_MS, 0) AS logFlushTimeMs,
        IFNULL(a.LOG_BUFFER_WRITE_B, 0) AS logBufferWriteB,

        ( IFNULL(a.NETWORK_LOGICAL_RECV_B, 0) +
        IFNULL(a.NETWORK_LOGICAL_SEND_B, 0)
        ) AS networkB,
        IFNULL(a.NETWORK_TIME_MS, 0) AS networkTimeMs,
        IFNULL(a.NETWORK_LOGICAL_RECV_B, 0) AS networkLogicalRecvB,
        IFNULL(a.NETWORK_LOGICAL_SEND_B, 0) AS networkLogicalSendB,

        ( IFNULL(a.DISK_LOGICAL_READ_B, 0) +
        IFNULL(a.DISK_LOGICAL_WRITE_B, 0) +
        IFNULL(a.LOG_BUFFER_WRITE_B, 0)
        ) AS diskB,
        IFNULL(a.DISK_TIME_MS, 0) AS diskTimeMs,
        IFNULL(a.DISK_LOGICAL_READ_B, 0) AS diskLogicalReadB,
        IFNULL(a.DISK_LOGICAL_WRITE_B, 0) AS diskLogicalWriteB,
        IFNULL(a.DISK_PHYSICAL_READ_B, 0) AS diskPhysicalReadB,
        IFNULL(a.DISK_PHYSICAL_WRITE_B, 0) AS diskPhysicalWriteB
    FROM
        information_schema.mv_activities_extended a
        LEFT JOIN information_schema.mv_queries q
        ON a.ACTIVITY_NAME = q.ACTIVITY_NAME
`;

type SQLMvActivity = {
    nodeId: BigNumber;
    activityName: MvActivityName;
    activityType: MvActivityType;
    aggregatorActivityName: Maybe<MvActivityName>;
    databaseName: Nullable<string>;
    partitionId: Nullable<BigNumber>;
    queryText: Nullable<string>;

    runCount: BigNumber;
    successCount: BigNumber;
    failureCount: BigNumber;

    cpuTimeMs: BigNumber;
    cpuWaitTimeMs: BigNumber;
    elapsedTimeMs: BigNumber;

    memoryBs: BigNumber;
    memoryMajorFaults: BigNumber;

    lockTimeMs: BigNumber;
    lockRowTimeMs: BigNumber;

    logBufferTimeMs: BigNumber;
    logFlushTimeMs: BigNumber;
    logBufferWriteB: BigNumber;

    networkTimeMs: BigNumber;
    networkLogicalRecvB: BigNumber;
    networkLogicalSendB: BigNumber;
    networkB: BigNumber;

    diskTimeMs: BigNumber;
    diskLogicalReadB: BigNumber;
    diskLogicalWriteB: BigNumber;
    diskPhysicalReadB: BigNumber;
    diskPhysicalWriteB: BigNumber;
    diskB: BigNumber;
};

const SQL_SET_DELTA_SLEEP = `
    SET SESSION activities_delta_sleep_s = ?
`;

const monusStatistics = (
    left: MvCumulativeStatistics & LooseObject,
    right: MvCumulativeStatistics
): MvCumulativeStatistics =>
    _.mapValues(right, (value, key) =>
        monus(left[key], value)
    ) as MvCumulativeStatistics;

const actToCumulativeStats = (act: SQLMvActivity): MvCumulativeStatistics => ({
    cpuTimeMs: act.cpuTimeMs,
    cpuWaitTimeMs: act.cpuWaitTimeMs,

    elapsedTimeMs: act.elapsedTimeMs,

    memoryBs: act.memoryBs,
    memoryMajorFaults: act.memoryMajorFaults,

    lockTimeMs: act.lockTimeMs,
    lockRowTimeMs: act.lockRowTimeMs,

    logBufferTimeMs: act.logBufferTimeMs,
    logFlushTimeMs: act.logFlushTimeMs,
    logBufferWriteB: act.logBufferWriteB,

    networkTimeMs: act.networkTimeMs,
    networkLogicalRecvB: act.networkLogicalRecvB,
    networkLogicalSendB: act.networkLogicalSendB,
    networkB: act.networkB,

    diskTimeMs: act.diskTimeMs,
    diskLogicalReadB: act.diskLogicalReadB,
    diskLogicalWriteB: act.diskLogicalWriteB,
    diskPhysicalReadB: act.diskPhysicalReadB,
    diskPhysicalWriteB: act.diskPhysicalWriteB,
    diskB: act.diskB,
});

// If deltaTimeS is 0, any division will just become 0
function deltaStatsToMvStats(
    stats: MvCumulativeStatistics,
    deltaTimeS: BigNumber
): MvStatistics {
    function divideByDeltaTimeS(val: BigNumber) {
        if (deltaTimeS.isZero()) {
            return new BigNumber(0);
        } else {
            return val.dividedBy(deltaTimeS);
        }
    }

    return {
        cpuUsage: divideByDeltaTimeS(stats.cpuTimeMs.dividedBy(1000)),

        cpuTime: stats.cpuTimeMs,
        cpuWaitTime: stats.cpuWaitTimeMs,
        elapsedTimeMs: stats.elapsedTimeMs,

        memoryB: divideByDeltaTimeS(stats.memoryBs),
        memoryMajorFaults: divideByDeltaTimeS(stats.memoryMajorFaults),

        lockTime: stats.lockTimeMs,
        lockRowTime: stats.lockRowTimeMs,

        logBufferTime: stats.logBufferTimeMs,
        logFlushTime: stats.logFlushTimeMs,
        logBufferWriteB: divideByDeltaTimeS(stats.logBufferWriteB),

        networkTime: stats.networkTimeMs,
        networkLogicalRecvB: divideByDeltaTimeS(stats.networkLogicalRecvB),
        networkLogicalSendB: divideByDeltaTimeS(stats.networkLogicalSendB),
        networkB: divideByDeltaTimeS(stats.networkB),

        diskTime: stats.diskTimeMs,
        diskLogicalReadB: divideByDeltaTimeS(stats.diskLogicalReadB),
        diskLogicalWriteB: divideByDeltaTimeS(stats.diskLogicalWriteB),
        diskPhysicalReadB: divideByDeltaTimeS(stats.diskPhysicalReadB),
        diskPhysicalWriteB: divideByDeltaTimeS(stats.diskPhysicalWriteB),
        diskB: divideByDeltaTimeS(stats.diskB),
    };
}

const buildMvFullStartAction = (
    sqlActivities: Array<SQLMvActivity>,
    requestedStartDate: Date
): MvFullStartAction => {
    const activities: {
        [id in MvActivityID]: MvActivityLowLevelCumulative
    } = {};

    const makeCumulativeActivity = (
        act: SQLMvActivity
    ): MvActivityLowLevelCumulative => {
        const aggregatorActivityName =
            act.aggregatorActivityName || act.activityName;

        return {
            kind: "LOW_LEVEL",

            // high level fields
            activityName: act.activityName,
            activityType: act.activityType,
            databaseName: nullableToMaybe(act.databaseName),
            queryText: nullableToMaybe(act.queryText),
            runCount: act.runCount,
            successCount: act.successCount,
            failureCount: act.failureCount,
            statistics: actToCumulativeStats(act),

            // low level fields
            nodeId: act.nodeId.toString(),
            aggregatorActivityName,
            partitionId: act.partitionId === null ? undefined : act.partitionId,
        };
    };

    for (let i = 0; i < sqlActivities.length; i++) {
        const cumulativeActivity = makeCumulativeActivity(sqlActivities[i]);
        activities[hashMvActivity(cumulativeActivity)] = cumulativeActivity;
    }

    return {
        type: "MV_FULL_START",
        error: false,
        payload: {
            loading: false,
            data: {
                activities,
                deltaTimeS: differenceInSeconds(new Date(), requestedStartDate),
            },
        },
    };
};

export const hashMvActivity = (
    mvActivity: SQLMvActivity | MvActivityLowLevelCumulative
): MvActivityID =>
    [
        mvActivity.nodeId,
        mvActivity.activityType,
        mvActivity.activityName,
        mvActivity.aggregatorActivityName || mvActivity.activityName,
        mvActivity.databaseName,
        mvActivity.partitionId,
    ].join(".");

const makeLowActivity = (
    act: SQLMvActivity,
    statistics: MvStatistics
): MvActivityLowLevel => {
    const aggregatorActivityName =
        act.aggregatorActivityName || act.activityName;

    return {
        kind: "LOW_LEVEL",

        // high level fields
        activityName: act.activityName,
        activityType: act.activityType,
        databaseName: nullableToMaybe(act.databaseName),
        queryText: nullableToMaybe(act.queryText),
        runCount: act.runCount,
        successCount: act.successCount,
        failureCount: act.failureCount,
        statistics,

        // low level fields
        nodeId: act.nodeId.toString(),
        aggregatorActivityName,
        partitionId: act.partitionId === null ? undefined : act.partitionId,
    };
};

function mvNodeArrayToMap(nodes: Array<MvNode>): { [id in MvNodeId]: MvNode } {
    let nodesObj: { [id in MvNodeId]: MvNode } = {};

    for (let i = 0; i < nodes.length; i++) {
        nodesObj[nodes[i].nodeId] = nodes[i];
    }

    return nodesObj;
}

const buildFixedIntervalMvFullAction = ({
    nodes,
    sqlActivities,
    advancedCounters,
    deltaTimeS,
}: {
    nodes: Array<MvNode>;
    advancedCounters: Array<SQLAdvancedCounter>;
    sqlActivities: Array<SQLMvActivity>;
    deltaTimeS: number;
}): MvFullAction => {
    const activities = _.map(sqlActivities, sqlActivity =>
        makeLowActivity(
            sqlActivity,
            deltaStatsToMvStats(sqlActivity, new BigNumber(deltaTimeS))
        )
    );

    const mvFullPayload: MvFullPayload = {
        repr: {
            nodes: mvNodeArrayToMap(nodes),
            activities,
            advancedCounters: advancedCounters[0].Value === "ON",
        },
        deltaTimeS,
    };

    return mvFull(mvFullPayload);
};

const buildMvFullAction = (
    startActivities: { [id in MvActivityID]: MvActivityLowLevelCumulative },
    startTime: Date,
    requestedStopDate: Date
) => ([nodes, advancedCounters, finalSqlActivities]: [
    Array<MvNode>,
    Array<SQLAdvancedCounter>,
    Array<SQLMvActivity>
]): MvFullAction => {
    const makeLowActivity = (
        act: SQLMvActivity,
        statistics: MvStatistics
    ): MvActivityLowLevel => {
        const aggregatorActivityName =
            act.aggregatorActivityName || act.activityName;

        return {
            kind: "LOW_LEVEL",

            // high level fields
            activityName: act.activityName,
            activityType: act.activityType,
            databaseName: nullableToMaybe(act.databaseName),
            queryText: nullableToMaybe(act.queryText),
            runCount: act.runCount,
            successCount: act.successCount,
            failureCount: act.failureCount,
            statistics,

            // low level fields
            nodeId: act.nodeId.toString(),
            aggregatorActivityName,
            partitionId: act.partitionId === null ? undefined : act.partitionId,
        };
    };

    const deltaTimeS = Math.max(differenceInSeconds(new Date(), startTime), 1);

    const deltaActivities: Array<MvActivityLowLevel> = [];
    _.forEach(finalSqlActivities, (finalAct: SQLMvActivity) => {
        const startAct = startActivities[hashMvActivity(finalAct)];

        // If we find a corresponding activity in the initial snapshot, this activity has
        // been running since before the profile recording started. This means that we need
        // to diff the two activities to make sure that we only show data relevant to the
        // duration of the recording.
        if (startAct) {
            // If this activity was never active during the profiling period,
            // we simply ignore it and don't include it in our Activities list.
            if (
                finalAct.successCount.eq(startAct.successCount) &&
                finalAct.failureCount.eq(startAct.failureCount) &&
                finalAct.runCount.isZero()
            ) {
                return;
            }

            // We use "monus" instead of the more expected "minus"
            // operation because counters are not 100% precise. By
            // using "monus", we guarantee we get at least sane values
            // (i.e. zero instead of uint64_t(-3)).
            //
            // For an example of a scenario where this might occur:
            // While we are iterating through lmv_tasks, we might see
            // a query and account its CPU time. By the time we get to
            // lmv_finished_tasks, the query has completed so its CPU
            // time has been aggregated into the activity. We now double
            // count time for this query. If we only did this for the
            // begin snapshot but not the end snapshot, we might see
            // counters go backwards.
            //
            // Finally, this code needs to be consistent with the equivalent
            // code in the engine that generates the `mv_activities`
            // table and that code uses `monus`, so we should use it too.

            // We do not change runCount because we want it to be the run count of the
            // final snapshot. This is the same behavior that the engine has.
            finalAct.successCount = monus(
                finalAct.successCount,
                startAct.successCount
            );

            finalAct.failureCount = monus(
                finalAct.failureCount,
                startAct.failureCount
            );

            const statistics = monusStatistics(finalAct, startAct.statistics);

            deltaActivities.push(
                makeLowActivity(
                    finalAct,
                    deltaStatsToMvStats(statistics, new BigNumber(deltaTimeS))
                )
            );
        } else {
            // This activity was created DURING the profiling, which means
            // that whatever is in the cumulative statistics is what we want
            // to display to our users.
            const deltaTimeS = finalAct.elapsedTimeMs.multipliedBy(1000);

            const cumulativeStats = actToCumulativeStats(finalAct);

            deltaActivities.push(
                makeLowActivity(
                    finalAct,
                    deltaStatsToMvStats(cumulativeStats, deltaTimeS)
                )
            );
        }
    });

    const mvFullPayload: MvFullPayload = {
        repr: {
            nodes: mvNodeArrayToMap(nodes),
            activities: deltaActivities,
            advancedCounters: advancedCounters[0].Value === "ON",
        },
        deltaTimeS: differenceInSeconds(new Date(), requestedStopDate),
    };

    return mvFull(mvFullPayload);
};

function sqlNodeToMvNode(sqlNode: SQLMvNode): MvNode {
    return {
        nodeId: sqlNode.nodeId.toString(),
        ipAddr: sqlNode.ipAddr,
        port: sqlNode.port,
        role: sqlNode.role,
        state: sqlNode.state,
        availabilityGroup: nullableToMaybe(sqlNode.availabilityGroup),

        numCpus: nullableToMaybe(sqlNode.numCpus),
        maxMemoryMb: nullableToMaybe(sqlNode.maxMemoryMb),
        memoryUsedMb: nullableToMaybe(sqlNode.memoryUsedMb),
    };
}

export const mvFullStart = makeActionCreator({
    name: "mvFullStart",

    handle(ctx: HandlerContext): Observable<MvFullStartAction> {
        const $loading = Observable.of<MvFullStartAction>({
            type: "MV_FULL_START",
            error: false,
            payload: { loading: true, meta: { fixedInterval: false } },
        });

        const requestedStartDate = new Date();

        const $compute = Observable.fromPromise(
            ctx.manager
                .getPooledConnection()
                .then(conn => {
                    const queryStart = self.performance.now();

                    return select(conn, SQL_MV_ACTIVITIES_CUMULATIVE)
                        .then((results: Array<SQLMvActivity>) => {
                            logging.log(
                                "info",
                                "The mvFullStartQuery took (ms)",
                                self.performance.now() - queryStart
                            );

                            const buildActionStart = self.performance.now();
                            const mvFullStartAction = buildMvFullStartAction(
                                results,
                                requestedStartDate
                            );
                            logging.log(
                                "info",
                                "Building the mvFullStart action took (ms)",
                                self.performance.now() - buildActionStart
                            );

                            return mvFullStartAction;
                        })
                        .finally(() => conn.release());
                })
                .catch(
                    (err: Error | MysqlError): MvFullStartAction => {
                        logging.logError(err);

                        return {
                            type: "MV_FULL_START",
                            error: true,
                            payload: { message: err.message },
                        };
                    }
                )
        );

        return Observable.merge($loading, $compute);
    },
});

export const mvFullStop = makeActionCreator({
    name: "mvFullStop",

    handle(
        ctx: HandlerContext,
        {
            startActivities,
            startTime, // when the MV recording was started
        }: {
            startActivities: {
                [id in MvActivityID]: MvActivityLowLevelCumulative
            };
            startTime: Date;
        }
    ): Observable<MvFullAction> {
        const $loading = Observable.of<MvFullAction>({
            type: "MV_FULL",
            error: false,
            payload: { loading: true },
        });

        // when the user tried to stop MV recording
        const requestedStopDate = new Date();

        const $compute = Observable.fromPromise(
            ctx.manager
                .getPooledConnection()
                .then(conn =>
                    multiSelect<SQLMvNode, SQLAdvancedCounter>(
                        conn,
                        { sql: SQL_MV_NODES },
                        { sql: SQL_GET_READ_ADVANCED_COUNTERS }
                    )
                        .then(([nodes, advancedCounters]) => {
                            const queryStart = self.performance.now();

                            return select(
                                conn,
                                SQL_MV_ACTIVITIES_CUMULATIVE
                            ).then(finalSqlActivities => {
                                logging.log(
                                    "info",
                                    "The mvFullStopQuery took (ms)",
                                    self.performance.now() - queryStart
                                );

                                return [
                                    _.map(nodes, sqlNodeToMvNode),
                                    advancedCounters,
                                    finalSqlActivities,
                                ];
                            });
                        })
                        .then(
                            (
                                results: [
                                    Array<MvNode>,
                                    Array<SQLAdvancedCounter>,
                                    Array<SQLMvActivity>
                                ]
                            ) => {
                                const buildActionStart = self.performance.now();

                                const mvFullStopAction = buildMvFullAction(
                                    startActivities,
                                    startTime,
                                    requestedStopDate
                                )(results);

                                logging.log(
                                    "info",
                                    "Building the mvFullStop action took (ms)",
                                    self.performance.now() - buildActionStart
                                );

                                return mvFullStopAction;
                            }
                        )
                        .finally(() => conn.release())
                )
                .catch(
                    (err: Error | MysqlError): MvFullAction => {
                        logging.logError(err);

                        return {
                            type: "MV_FULL",
                            error: true,
                            payload: {
                                message: err.message,
                            },
                        };
                    }
                )
        );

        return Observable.merge($loading, $compute);
    },
});

export const mvFullFixedInterval = makeActionCreator({
    name: "mvFullFixedInterval",

    handle(
        ctx: HandlerContext,
        {
            deltaTimeS,
        }: {
            deltaTimeS: number;
        }
    ): Observable<MvFullAction | MvFullStartAction> {
        const $loading = Observable.of<MvFullStartAction>({
            type: "MV_FULL_START",
            error: false,
            payload: {
                loading: true,
                meta: { fixedInterval: true, deltaTimeS },
            },
        });

        const $compute = Observable.fromPromise(
            ctx.manager.getPooledConnection().then(conn =>
                multiSelect<SQLMvNode, SQLAdvancedCounter>(
                    conn,
                    { sql: SQL_MV_NODES },
                    { sql: SQL_GET_READ_ADVANCED_COUNTERS },
                    { sql: SQL_SET_DELTA_SLEEP, params: [deltaTimeS] }
                )
                    .then(([nodes, advancedCounters]) => {
                        return select(conn, SQL_MV_ACTIVITIES).then(
                            (sqlActivities: Array<SQLMvActivity>) => {
                                return buildFixedIntervalMvFullAction({
                                    nodes: _.map(nodes, sqlNodeToMvNode),
                                    advancedCounters,
                                    sqlActivities,
                                    deltaTimeS,
                                });
                            }
                        );
                    })
                    .finally(() => conn.release())
            )
        );

        return Observable.merge($loading, $compute);
    },
});
