import { BehaviorSubject, interval, of } from "rxjs";
import { fromFetch } from "rxjs/fetch";

import {
  catchError,
  distinctUntilChanged,
  map,
  share,
  switchMap,
  take,
} from "rxjs/operators";

type Validator<T> = (raw: any) => raw is T;

export const fetchPipeline = <T>(
  params: Parameters<typeof fromFetch>[1] & {
    query: string;
    validate: Validator<T>;
    onError?: (e: unknown) => void | boolean;
    retryTimeout?: number;
  },
) => {
  const { query, validate, onError, retryTimeout, ...rest } = params;
  const subject = new BehaviorSubject<T | undefined>(undefined);

  return fromFetch(query, rest).pipe(
    switchMap((response) => {
      if (!response.ok) {
        throw new Error(`status ${response.status}`);
      }

      return response.json();
    }),

    map((raw) => {
      if (!validate(raw)) {
        throw new Error(`unexpected result, got ${JSON.stringify(raw)}`);
      }

      return raw;
    }),

    catchError((err, caught) =>
      !onError?.(err)
        ? of(undefined)
        : interval(retryTimeout).pipe(
            take(1),
            switchMap(() => caught),
          ),
    ),

    share({ connector: () => subject }),
    distinctUntilChanged(),
  );
};
