import { AnySourceStream, AnyStream, AnySubscription, Stream } from '../stream'
import { Pass } from './pass'

export class Buffer<V, E = never, C = never> extends Pass<V, E, C, V[]> {
    protected bufferQueue: V[] = []
    protected toFinish = false
    protected toComplete = false
    protected toError = false
    protected completeEvent: C | undefined
    protected errorEvent: E | undefined

    protected signal: AnyStream

    protected signalSubscription: AnySubscription = {
        value: () => {
            this.flush()
        },
        complete: () => {
            this.flush()
            this.close()
        },
        error: () => {
            this.flush()
            this.close()
        },
    }

    constructor(signal: AnyStream, source?: AnySourceStream<V, E, C>) {
        super(source)
        this.signal = signal
    }

    override value(value: V): void {
        if (this.toFinish) {
            return
        }
        this.bufferQueue.push(value)
    }

    override error(error: E): void {
        if (this.toFinish) {
            return
        }
        this.toFinish = true
        this.toError = true
        this.errorEvent = error
    }

    override complete(complete: C): void {
        if (this.toFinish) {
            return
        }
        this.toFinish = true
        this.toComplete = true
        this.completeEvent = complete
    }

    flush(): void {
        this._value(this.bufferQueue)

        if (this.toComplete) {
            this._complete(this.completeEvent!)
        } else if (this.toError) {
            this._error(this.errorEvent!)
        }

        this.bufferQueue = []
    }

    override connect(): void {
        super.connect()
        this.signal.subscribe(this.signalSubscription)
    }

    override disconnect(): void {
        super.disconnect()
        this.signal.unsubscribe(this.signalSubscription)
    }
}

declare module '../stream' {
    // eslint-disable-next-line no-shadow
    interface Stream<SV, SE = never, SC = never, DV = SV, DE = SE, DC = SC> {
        buffer(signal: AnyStream): Stream<DV, DE, DC, DV[], DE, DC>
    }
}

Stream.prototype.buffer = function <DV, DE, DC>(this: AnySourceStream<DV, DE, DC>, signal: AnyStream) {
    return new Buffer(signal, this) // tslint:disable-line:no-invalid-this
}
