import { AnySourceStream, Stream, Subscription } from '../stream'

export class Merge<V, E, C> extends Stream<V, E, C> {
    protected sources = new Map<AnySourceStream<V, E, C>, Subscription<V, E, C>>()

    constructor(sources: AnySourceStream<V, E, C>[]) {
        super()

        const value = (val: V) => this._value(val)
        const error = (err: E) => this._error(err)

        for (const source of sources) {
            const sub = {
                value,
                error,
                complete: (complete: C) => {
                    source.unsubscribe(sub)
                    this.sources.delete(source)
                    if (this.sources.size === 0) {
                        this._complete(complete)
                    }
                },
            }
            this.sources.set(source, sub)
        }
    }

    override connect(): void {
        super.connect()
        for (const [source, sub] of this.sources) {
            source.subscribe(sub)
        }
    }

    override disconnect(): void {
        super.disconnect()
        for (const [source, sub] of this.sources) {
            source.unsubscribe(sub)
        }
    }
}

declare module '../stream' {
    // eslint-disable-next-line no-shadow
    interface Stream<SV, SE = never, SC = never, DV = SV, DE = SE, DC = SC> {
        merge<_DV, _DE, _DC>(...sources: AnySourceStream<_DV, _DE, _DC>[]): MirrorStream<DV | _DV, DE | _DE, DC | _DC>
    }
}

Stream.prototype.merge = function <V, E, C>(this: AnySourceStream<V, E, C>, ...sources: AnySourceStream<V, E, C>[]) {
    sources.unshift(this) // tslint:disable-line:no-invalid-this
    return new Merge(sources)
}
