import { AnyMessage, MessageType } from '../message'
import { AnySourceStream, AnyStream, AnySubscription, Stream } from '../stream'
import { Pass } from './pass'

const scheduleMap = new Map<
    AnyStream,
    {
        queue: { message: AnyMessage; stream: AnyStream }[]
        subscription: AnySubscription
        subscribers: Set<AnyStream>
        connectedSubscribersCount: number
    }
>()

export class Schedule<V, E = never, C = never> extends Pass<V, E, C, V, E, C | void> {
    protected toFinish = false
    protected queue: { message: AnyMessage; stream: AnyStream }[]

    protected signal: AnyStream

    constructor(signal: AnyStream, source?: AnySourceStream<V, E, C>) {
        super(source)

        this.signal = signal

        if (!scheduleMap.has(signal)) {
            this.setupSignal(signal)
        }

        const schedule = scheduleMap.get(signal)!
        schedule.subscribers.add(this as any)
        this.queue = schedule.queue
    }

    override value(value: V): void {
        if (this.toFinish) {
            return
        }
        this.queue.push({
            stream: this as any,
            message: {
                type: MessageType.Value,
                value: value,
            },
        })
    }

    override error(error: E): void {
        if (this.toFinish) {
            return
        }
        this.toFinish = true
        this.queue.push({
            stream: this as any,
            message: {
                type: MessageType.Error,
                error,
            },
        })
    }

    override complete(complete: C): void {
        if (this.toFinish) {
            return
        }
        this.toFinish = true
        this.queue.push({
            stream: this as any,
            message: {
                type: MessageType.Complete,
                complete,
            },
        })
    }

    override connect(): void {
        super.connect()

        const schedule = scheduleMap.get(this.signal)!
        if (schedule.connectedSubscribersCount === 0) {
            this.signal.subscribe(schedule.subscription)
        }
        schedule.connectedSubscribersCount++
    }

    override disconnect(): void {
        super.disconnect()

        const schedule = scheduleMap.get(this.signal)!
        schedule.connectedSubscribersCount--
        if (schedule.connectedSubscribersCount === 0) {
            this.signal.unsubscribe(schedule.subscription)
        }
    }

    protected override close(): void {
        const subscribers = scheduleMap.get(this.signal)!.subscribers
        subscribers.delete(this as any)
        if (subscribers.size === 0) {
            this.disconnect()
        }
        super.close()
    }

    private setupSignal(signal: AnyStream): void {
        const flush = () => {
            const queue = scheduleMap.get(signal)!.queue
            for (const { message, stream } of queue) {
                stream.internalPublish(message)
            }
            queue.length = 0
        }

        const complete = () => {
            flush()
            const subscribers = scheduleMap.get(signal)!.subscribers
            for (const sub of subscribers) {
                sub.complete()
            }
            scheduleMap.delete(signal)
        }

        const subscription = {
            value: () => {
                flush()
            },
            complete,
            error: complete,
        }

        scheduleMap.set(signal, {
            queue: [],
            subscription,
            connectedSubscribersCount: 0,
            subscribers: new Set(),
        })
    }
}

declare module '../stream' {
    // eslint-disable-next-line no-shadow
    interface Stream<SV, SE = never, SC = never, DV = SV, DE = SE, DC = SC> {
        schedule(signal: AnyStream): MirrorStream<DV, DE, DC>
    }
}

Stream.prototype.schedule = function <DV, DE, DC>(this: AnySourceStream<DV, DE, DC>, signal: AnyStream) {
    return new Schedule(signal, this) // tslint:disable-line:no-invalid-this
}
