import isFunction from 'lodash/isFunction'
import noop from 'lodash/noop'
import { Message, MessageType } from './message'
import { AsyncResource } from '../async/AsyncResource'
export class Stream<SV, SE = never, SC = never, DV = SV, DE = SE, DC = SC>
    extends AsyncResource
    implements FullSubscription<SV, SE, SC>
{
    constructor() {
        super('Stream')
    }

    closed = false
    connected = false

    protected subscriptions = new Set<Subscription<DV, DE, DC>>()

    connect(): void {
        this.connected = true
    }
    disconnect(): void {
        this.connected = false
    }

    value(value: SV): void {
        this._value(value as any)
    }
    error(error?: SE): void {
        this._error(error as any)
    }
    complete(complete?: SC): void {
        this._complete(complete as any)
    }

    subscribeOnce(value: (val: DV) => void): () => void {
        let done = false
        const unSub = this.subscribe((...args) => {
            value(...args)
            done = true
            unSub?.()
        })

        if (done) {
            unSub()
        }

        return unSub
    }

    subscribe(value: (val: DV) => void): () => void
    subscribe(subscription: Subscription<DV, DE, DC>): () => void
    subscribe(valueOrSubscription?: Subscription<DV, DE, DC> | ((value: DV) => void)): () => void {
        const isSubscription = !isFunction(valueOrSubscription)

        const subscription = isSubscription
            ? (valueOrSubscription as Subscription<DV, DE, DC>)
            : { value: valueOrSubscription as (value: DV) => void }

        if (this.subscriptions.has(subscription)) {
            throw new Error(`already subscribed`)
        }
        if (this.closed) {
            return noop
        }

        this.subscriptions.add(subscription)

        if (!this.connected && this.subscriptions.size === 1) {
            this.connect()
        }

        return () => this.unsubscribe(subscription)
    }

    unsubscribe(value: (val: DV) => void): void
    unsubscribe(subscription: Subscription<DV, DE, DC>): void
    unsubscribe(valueOrSubscription: Subscription<DV, DE, DC> | ((value: DV) => void)): void {
        if (isFunction(valueOrSubscription)) {
            const onValue = valueOrSubscription
            const subscription = [...this.subscriptions.values()].find((x) => x.value === onValue)
            if (!subscription) {
                if (process.env.NODE_ENV === 'development') {
                    // throw new Error(`subscription not found`)
                    // eslint-disable-next-line no-console
                    console.warn(`subscription not found`)
                } else {
                    return
                }
            }

            if (subscription) {
                this.subscriptions.delete(subscription)
            }
        } else {
            const subscription = valueOrSubscription as Subscription<DV, DE, DC>
            if (!this.subscriptions.has(subscription)) {
                if (process.env.NODE_ENV === 'development') {
                    // throw new Error(`subscription not found`)
                    // eslint-disable-next-line no-console
                    console.warn(`subscription not found`)
                } else {
                    return
                }
            }

            this.subscriptions.delete(subscription)
        }

        if (this.connected && this.subscriptions.size === 0) {
            this.disconnect()
        }
    }

    publish(message: Message<SV, SE, SC>): void {
        switch (message.type) {
            case MessageType.Value:
                this.value(message.value)
                break
            case MessageType.Error:
                this.error(message.error)
                break
            case MessageType.Complete:
                this.complete(message.complete)
                break
        }
    }

    internalPublish(message: Message<DV, DE, DC>): void {
        switch (message.type) {
            case MessageType.Value:
                this._value(message.value)
                break
            case MessageType.Error:
                this._error(message.error)
                break
            case MessageType.Complete:
                this._complete(message.complete)
                break
        }
    }

    protected close(): void {
        if (this.closed) {
            return
        }
        this.closed = true

        if (this.connected) {
            this.disconnect()
        }

        this.subscriptions.clear()
    }

    protected _value(value: DV): void {
        if (this.closed) {
            return
        }

        for (const subscription of this.subscriptions) {
            if (subscription.value) {
                this.runInAsyncScope(subscription.value, subscription, value)
            }
        }
    }

    protected _error(error: DE): void {
        const hasErrorListener = [...this.subscriptions.values()].some((x) => !!x.error)
        if (!hasErrorListener) {
            throw error
        }
        if (this.closed) {
            return
        }

        for (const subscription of this.subscriptions) {
            if (subscription.error) {
                this.runInAsyncScope(subscription.error, subscription, error)
            }
        }
    }

    protected _complete(complete: DC): void {
        if (this.closed) {
            return
        }

        if (this.connected) {
            for (const subscription of this.subscriptions) {
                if (subscription.complete) {
                    this.runInAsyncScope(subscription.complete, subscription, complete)
                }
            }
        }

        this.close()
    }
}

export type MirrorStream<V, E = never, C = never> = Stream<V, E, C>
export type AnySourceStream<V, E = never, C = never> = Stream<unknown, unknown, unknown, V, E, C>
export type SourcelessStream<V, E = never, C = never> = Stream<never, never, never, V, E, C>
export type DrainlessStream<V, E = never, C = never> = Stream<V, E, C, never, never, never>
export type AnyDrainStream<V, E = never, C = never> = Stream<V, E, C, unknown, unknown, unknown>
export type AnyStream = Stream<unknown, unknown, unknown>

export interface Subscription<V, E = never, C = never> {
    value?(value: V): void
    error?(error: E): void
    complete?(complete: C): void
}

export interface FullSubscription<V, E = never, C = never> extends Subscription<V, E, C> {
    value(value: V): void
    error(error: E): void
    complete(complete: C): void
}

export type AnySubscription = Subscription<unknown, unknown, unknown>

export type UnpackedStream<OriginalValue> = OriginalValue extends Stream<unknown, unknown, unknown, infer StreamValue>
    ? StreamValue
    : OriginalValue
export type UnpackedStreamOrPromise<OriginalValue> = OriginalValue extends Stream<
    unknown,
    unknown,
    unknown,
    infer StreamValue
>
    ? StreamValue
    : OriginalValue extends Promise<infer PromiseValue>
    ? PromiseValue
    : OriginalValue
