import _ from "lodash";

import EventEmitter from "eventemitter3";
import { Observable, Observer } from "rxjs";
import { jsonReplacer, jsonReviver } from "util/json";

export type StreamId = string;

export type Request = {
    kind: "request";
    action: string;
    args: { [key: string]: any };
};

export type ResponseFrame = {
    kind: "response-frame";
    data: any;
};

export type ResponseError = {
    kind: "response-error";
    error: any;
};

export type ResponseEnd = {
    kind: "response-end";
};

export type Payload = Request | ResponseFrame | ResponseError | ResponseEnd;
type Message = [StreamId, Payload];

const nextStreamId = (() => {
    let counter = 0;
    return (): StreamId => `s${counter++}`;
})();

export default class WorkerProtocol extends EventEmitter {
    worker: Worker;

    constructor(worker: Worker) {
        super();
        this.worker = worker;
        this.worker.onmessage = this._recv.bind(this);
    }

    // Send the request and return an Observable that streams the
    // worker's response.
    stream(request: Request): Observable<unknown> {
        const streamId = nextStreamId();
        return Observable.create((observer: Observer<unknown>) => {
            let handler = (r: Payload) => {
                switch (r.kind) {
                    case "request":
                        throw new Error(
                            `Received unexpected response in client: ${JSON.stringify(
                                r
                            )}`
                        );

                    case "response-frame":
                        observer.next(r.data);
                        break;

                    case "response-error":
                        this.removeListener(streamId, handler);
                        observer.error(r.error);
                        break;

                    case "response-end":
                        this.removeListener(streamId, handler);
                        observer.complete();
                        break;
                }
            };

            // subscribe to the stream
            this.on(streamId, handler);
            // send the request to the worker
            this.send(streamId, request);

            // on observable unsubscription, stop receiving stream events
            return () => this.removeListener(streamId, handler);
        });
    }

    // Send a request or response to a specific stream
    send(streamId: StreamId, payload: Payload): void {
        let msg: Message = [streamId, payload];
        try {
            let json = JSON.stringify(msg, jsonReplacer);
            this.worker.postMessage(json);
        } catch (err) {
            console.error("Failed to serialize message to JSON:", err);
        }
    }

    _recv(msg: MessageEvent): void {
        if (typeof msg.data !== "string") {
            console.error("Received non-string message from webworker", msg);
            return;
        }
        let json = msg.data;

        let streamId, payload;
        try {
            let message: Message = JSON.parse(json, jsonReviver);
            [streamId, payload] = message;
        } catch (err) {
            return console.error(
                "Failed to parse message from worker:",
                msg,
                err
            );
        }

        if (this.listenerCount(streamId) > 0) {
            this.emit(streamId, payload);
        } else {
            this.emit("stream", streamId, payload);
        }
    }
}
