import { Observable, Observer } from 'rxjs';

export class RSOMockReader<T> {
    private done = false;
    constructor(private data: T) {}

    read(): Promise<ReadableStreamReadResult<T>> {
        if (!this.done) {
            this.done = true;
            return Promise.resolve({ done: false, value: this.data });
        } else {
            return Promise.resolve({ done: true });
        }
    }

    cancel() {
        this.done = true;
        return Promise.resolve();
    }

    get closed() {
        return this.done ? Promise.resolve() : Promise.reject();
    }

    releaseLock() {
        this.done = true;
        return Promise.resolve();
    }
}

export class ReadableStreamObservable<T> extends Observable<T> {
    private observer: Observer<T>;
    constructor(private reader: ReadableStreamDefaultReader<T>) {
        super((observer) => {
            this.observer = observer;
            this.doRead();
            return {
                unsubscribe: () => this.reader.cancel(),
            };
        });
    }

    private onRead = (chunk: ReadableStreamReadResult<T>) => {
        if (chunk.done) {
            this.observer.complete();
        } else {
            this.observer.next(chunk.value);
            this.doRead();
        }
    };

    private onError = (err: any) => {
        this.observer.error(err);
    };

    private doRead() {
        this.reader.read().then(this.onRead).catch(this.onError);
    }
}
