import { Injectable } from '@angular/core';
import { Observable, MonoTypeOperatorFunction, Subscription, OperatorFunction, EMPTY, of, identity } from 'rxjs';
import { switchMap, takeWhile, startWith, last } from 'rxjs/operators';
import { intervalBackoff, IntervalBackoffConfig } from 'backoff-rxjs';

import { FutureResponse } from '../../generated-sources';

import { DkuHttpService, HttpVerb } from './dku-http.service';
import { auditMap } from '../utils/rxutils';
import { Modify } from '../utils/typedefs';

export type TypedFutureResponse<T> = Modify<FutureResponse, { 'result': T }>;

const POLLING_BACKOFF_STRATEGY: IntervalBackoffConfig = {
    initialInterval: 500,
    maxInterval: 10000,
    backoffDelay: (iteration, initialInterval) =>
        Math.pow(1.1, iteration) * initialInterval
};

@Injectable({
    providedIn: 'root'
})
export class FutureWatcherService {
    constructor(private dkuHttpService: DkuHttpService) { }

    /**
     * Call an API endpoint returning a future and start polling until result is available
     * If 'abortInBackend' is set to true, the future is aborted in the backend when the observable is unsubscribed
     */
    public requestFuture<T>(
        method: HttpVerb,
        path: string,
        params?: object,
        abortInBackend = true
    ): Observable<TypedFutureResponse<T>> {
        return this.dkuHttpService.request<TypedFutureResponse<T>>(method, path, params || {})
            .pipe(
                this.pollFuture(),
                abortInBackend ? this.abortFutureOnUnsubscription() : identity
            );
    }

    /**
     * RxJS operator that can be chained after requestFuture() in order
     * to wait until the result is available
     */
    public waitForResult<T>(): OperatorFunction<TypedFutureResponse<T>, T> {
        return future => future.pipe(
            last(),
            switchMap(resp => resp.hasResult ? of(resp.result) : EMPTY)
        );
    }

    /**
     * Private operator implementing the polling
     */
    private pollFuture<T>(): MonoTypeOperatorFunction<TypedFutureResponse<T>> {
        return initialRequest => initialRequest.pipe(
            switchMap(initialResp => {
                return intervalBackoff(POLLING_BACKOFF_STRATEGY).pipe(
                    auditMap(() => this.getFutureUpdate<T>(initialResp.jobId)),
                    startWith(initialResp),
                    takeWhile(resp => resp.alive, true)
                );
            })
        );
    }

    /**
     * Private operator implementing 'backend abort when unsubscribed' feature
     *
     * Note: it is impossible to ask the backend to abort a future before the frontend
     *       receives the 'jobId'. If the 'jobId' never comes back (eg. network issue),
     *       then the future cannot be canceled (inherent limitation of futures API)
     */
    private abortFutureOnUnsubscription<T>(): MonoTypeOperatorFunction<TypedFutureResponse<T>> {
        return future => new Observable(observer => {
            let subscription: Subscription;
            let runningJobId: string | null = null;
            let shouldAbort = false;

            const abortIfNecessary = () => {
                if (shouldAbort && runningJobId && !subscription.closed) {
                    // No way to handle failure here :(
                    this.abortFuture(runningJobId).subscribe();
                    // Stop the watcher
                    subscription.unsubscribe();
                }
            };

            subscription = future.subscribe(
                value => {
                    runningJobId = value.alive && !value.aborted ? value.jobId : null;
                    abortIfNecessary();
                    observer.next(value);
                },
                error => {
                    runningJobId = null;
                    observer.error(error);
                },
                () => {
                    runningJobId = null;
                    abortIfNecessary();
                    observer.complete();
                }
            );

            return () => {
                shouldAbort = true;
                abortIfNecessary();
            };
        });
    }

    private getFutureUpdate<T>(futureId: string) {
        return this.dkuHttpService.request<TypedFutureResponse<T>>('GET', '/futures/get-update', { futureId });
    }

    private abortFuture(futureId: string) {
        return this.dkuHttpService.request<void>('POST', '/futures/abort', { futureId });
    }
}
