import {
  Environment,
  Network,
  RecordSource,
  Store,
  Observable as RelayObservable,
} from "relay-runtime";
import { BehaviorSubject, Observable } from "rxjs";
import UserFriendlyMessageError from "common/UserFriendlyMessageError.ts";
import WebsocketConnectionFailedError from "common/WebsocketConnectionFailedError.ts";
import logger from "common/logger.ts";
import fetchGraphql from "common/fetchGraphql.ts";
import GraphqlError from "./GraphqlError.ts";
import Auth from "./Auth.ts";
import { Config } from "./config.ts";

export default function createRelayEnvironment({
  auth,
  onWebsocketConnected,
  onWebsocketDisconnected,
  onWebsocketError,
  localeSubject,
  config,
}: {
  auth?: Auth;
  onWebsocketConnected?: () => void;
  onWebsocketDisconnected?: (props: { retryCount: number; error: any }) => void;
  onWebsocketError?: (error: any) => void;
  localeSubject?: BehaviorSubject<string>;
  config: Config;
}): Environment {
  function fetchQuery(operation, variables, cacheConfig, uploadables) {
    const request: {
      headers: { [key: string]: any };
      body?: FormData | { [key: string]: any };
    } = {
      headers: {},
    };

    const locale = localeSubject?.getValue();
    if (locale) request.headers["Accept-Language"] = locale;
    if (auth) request.headers.authorization = `Basic ${auth.auth}`;

    if (uploadables) {
      const formData = new FormData();
      if (operation.text) formData.append("query", operation.text);
      if (operation.id) formData.append("queryMd5Hash", operation.id);
      formData.append("variables", JSON.stringify(variables));

      Object.keys(uploadables).forEach((name) => {
        if (Array.isArray(uploadables[name])) {
          uploadables[name].forEach((file) => {
            if (file) formData.append(name, file);
          });
        } else if (uploadables[name]) formData.append(name, uploadables[name]);
      });

      request.body = formData;
    } else {
      request.body = {};
      if (operation.text) request.body.query = operation.text;
      if (operation.id) request.body.queryMd5Hash = operation.id;
      if (variables) request.body.variables = variables;
    }

    const promise = fetchGraphql(config.graphql.endpointUrl, request);

    // We need to detect errors here because otherwise QueryRenderer does not render partial errors.
    if (operation.operationKind === "query")
      return promise.then((json) => {
        if (json.errors)
          logger.error(
            new GraphqlError("Errors in query payload.", {
              errors: json.errors,
            }),
          );
        return json;
      });

    return promise;
  }

  enum SubscriptionMessageType {
    Next = "next",
    Error = "error",
    Complete = "complete",
    Ack = "ack",
    Cancel = "cancel",
    Subscribe = "subscribe",
  }

  type SubscriptionMessage = {
    id?: string;
    authenticate?: string;
    streamId?: string;
    type: string;
    subscriptionId?: string;
    subscriptions?: {
      id: string;
      query: string;
      variables: { [key: string]: any };
    }[];
    payload?: any;
    requiresAck?: true;
  };

  class GraphqlSubscriptionsClient {
    private subscriptions = {};

    private lastSubscriptionId = 0;

    private url: string;

    private socket?: WebSocket | null;

    private sockOpen = false;

    private streamId?: string;

    private connectRetryCount?: number;

    constructor(url: string) {
      this.url = url;
    }

    private send(message: SubscriptionMessage) {
      this.socket?.send(JSON.stringify(message));
    }

    private ensureConnected() {
      if (this.socket) return;

      const socket = new WebSocket(this.url);

      socket.onerror = (error: any) => {
        if (onWebsocketDisconnected && this.socket?.readyState === 3) {
          if (this.connectRetryCount === undefined) {
            onWebsocketError?.(
              new WebsocketConnectionFailedError({ cause: error }),
            );
          } else
            onWebsocketDisconnected({
              error: new WebsocketConnectionFailedError({ cause: error }),
              retryCount: this.connectRetryCount,
            });
        } else if (onWebsocketError)
          onWebsocketError(
            new UserFriendlyMessageError("Live connection error.", {
              cause: error,
            }),
          );
      };

      socket.onopen = () => {
        this.sockOpen = true;
        this.connectRetryCount = 0;
        onWebsocketConnected?.();

        const message: SubscriptionMessage = {};

        if (auth) message.authenticate = `Basic ${auth.auth}`;

        if (this.streamId) message.streamId = this.streamId;

        this.send(message);
        this.ensureSubscriptionsSent();
      };

      socket.onclose = () => {
        this.socket = null;
        this.sockOpen = false;

        Object.entries(this.subscriptions).forEach((entry) => {
          entry[1].sent = false;
        });

        setTimeout(
          () => {
            try {
              this.connectRetryCount = (this.connectRetryCount || 0) + 1;
              this.ensureConnected();
            } catch (error) {
              if (onWebsocketError)
                onWebsocketError(
                  new UserFriendlyMessageError("Live connection error.", {
                    cause: error,
                  }),
                );
            }
          },
          1000 + Math.min(5, this.connectRetryCount || 0) * 1000,
        );
      };

      socket.onmessage = (e) => {
        const message = JSON.parse(e.data) as SubscriptionMessage;
        const { id, streamId, subscriptionId, type, payload, requiresAck } =
          message;
        const subscription = this.subscriptions[subscriptionId];

        if (streamId !== undefined) this.streamId = streamId;

        if (type === SubscriptionMessageType.Next && subscription) {
          // We need to log errors here, as relay does not seem to trigger onError
          if (payload?.errors)
            logger.error(
              new GraphqlError("Errors in subscription payload.", {
                errors: payload.errors,
              }),
            );

          subscription.observer.next(payload);
        } else if (type === SubscriptionMessageType.Complete && subscription) {
          payload.errors?.forEach((error: any) =>
            subscription.observer.error(error),
          );
          subscription.observer.complete();
        } else if (type === SubscriptionMessageType.Error) {
          if (subscription) subscription.observer.error(payload);
          else onWebsocketError?.(payload);
        }

        if (requiresAck && id !== undefined)
          this.send({ type: SubscriptionMessageType.Ack, id });
      };

      this.socket = socket;
    }

    public addSubscription({
      query,
      queryMd5Hash,
      variables,
    }: {
      query?: string;
      queryMd5Hash?: string;
      variables: { [key: string]: any };
    }) {
      return RelayObservable.from(
        new Observable((observer) => {
          this.ensureConnected();
          this.lastSubscriptionId += 1;
          const subscriptionId = this.lastSubscriptionId.toString();

          this.subscriptions[subscriptionId] = {
            query,
            queryMd5Hash,
            variables,
            observer,
          };
          if (this.sockOpen) this.ensureSubscriptionsSent();

          return () => {
            delete this.subscriptions[subscriptionId];
            if (this.socket)
              this.send({
                type: SubscriptionMessageType.Cancel,
                subscriptionId,
              });
          };
        }),
      );
    }

    private ensureSubscriptionsSent() {
      const sendSubscriptions = Object.entries(this.subscriptions)
        .map(([id, subscription]) => {
          const { query, queryMd5Hash, variables, sent } = subscription;
          if (sent) return null;
          subscription.sent = true;
          return { id, query, queryMd5Hash, variables };
        })
        .filter((subscription) => subscription);

      if (sendSubscriptions.length > 0) {
        this.send({
          type: SubscriptionMessageType.Subscribe,
          subscriptions: sendSubscriptions,
        });
      }
    }
  }

  let graphqlSubscriptionClient: GraphqlSubscriptionsClient | undefined;

  const subscribe = ({ text: query, id: queryMd5Hash }, variables) => {
    if (!graphqlSubscriptionClient)
      graphqlSubscriptionClient = new GraphqlSubscriptionsClient(
        config.graphql.subscriptionsEndpointUrl,
      );

    return graphqlSubscriptionClient.addSubscription({
      query,
      queryMd5Hash,
      variables,
    });
  };

  return new Environment({
    network: Network.create(fetchQuery, subscribe),
    store: new Store(new RecordSource()),
  });
}
