import { BehaviorSubject, merge, NEVER, Observable, Subject } from 'rxjs';
import { catchError, switchMap, takeUntil } from 'rxjs/operators';
import { HttpErrorResponse } from '@angular/common/http';

export class SourceDataCache<T> {
    private readonly _source$: Observable<T>;

    private _cache$: BehaviorSubject<T | null>;

    private _update$ = new Subject<void>();
    private _set$ = new Subject<T | null>();
    private _complete$ = new Subject<void>();
    private _onError$ = new Subject<HttpErrorResponse | Error>();

    constructor(source: Observable<T>) {
        this._source$ = source;
        this._cache$ = new BehaviorSubject<T | null>(null);

        this.handleCacheValueUpdates();
    }

    get dataStream(): Observable<T | null> {
        return this._cache$.asObservable();
    }

    get dataSnapshot(): T | null {
        return this._cache$.getValue();
    }

    get onError$(): Observable<HttpErrorResponse | Error> {
        return this._onError$.asObservable();
    }

    /**
     * Takes the data from the source
     */
    update(): void {
        this._update$.next();
    }

    /**
     * Pushes new value to the data stream
     * @param {T} value
     */
    set(value: T | null): void {
        this._set$.next(value);
    }

    /**
     * Completes the data stream and clears cache
     */
    complete(): void {
        this._complete$.next();
        this._complete$.complete();
        this._cache$.complete();
        this._update$.complete();
        this._set$.complete();
    }

    private handleCacheValueUpdates(): void {
        // Emits when update function is triggered, then gets a value from a source
        const update$ = this._update$.asObservable().pipe(
            switchMap(() =>
                this._source$.pipe(
                    // If error occurs, not emit a value
                    catchError((error: HttpErrorResponse | Error) => {
                        this._onError$.next(error);
                        return NEVER;
                    })
                )
            )
        );

        // Emits when set function is triggered with a passed value
        const set$ = this._set$.asObservable();

        /* Turn multiple observables into a single observable, which concurrently emits all values
        from every given input Observables. */
        merge(update$, set$)
            .pipe(takeUntil(this._complete$))
            .subscribe(value => this._cache$.next(value));
    }
}
