import { Injectable } from '@angular/core';
import {
  AMQPChannel,
  AMQPConsumer,
  AMQPWebSocketClient,
} from '@cloudamqp/amqp-client';
import { Store } from '@ngrx/store';
import { catchError, concatMap, EMPTY, Subject, Subscription } from 'rxjs';
import { WebsocketMessage } from 'src/app/models/webservice';
import { State } from 'src/app/reducers';
import { reportWebSocketError } from 'src/app/reducers/user/user.actions';
import { getDeviceId } from 'src/app/shared/utils.functions';

@Injectable({
  providedIn: 'root',
})
export class WebsocketService {
  private deviceId = '';
  private exchange = 'amq.topic';
  private queue = '';
  private url = `wss://guiitkub:BD3jGb8kGS69vRm3OlyWB0-0YcvgEAxp@buzzing-black-squirrel.rmq4.cloudamqp.com/ws/amqp`;
  private amqp = this.setAmqpClient();

  private channel?: AMQPChannel;
  private consumer?: AMQPConsumer;

  private maxRetryAttempts = 3;
  private socketSubject$ = new Subject<string>();
  private socketSubscription$?: Subscription;

  public messagesSubject$ = new Subject<WebsocketMessage>();

  constructor(private ngrxStore: Store<State>) {}

  public connect(queue: string): Promise<void> {
    this.disconnect();
    this.deviceId = getDeviceId();
    if (queue) this.queue = queue;
    return this.start();
  }

  private cleanupConnection(): void {
    if (this.consumer && this.channel) {
      this.channel.close().catch(() => {});
    }
    if (this.channel && !this.channel.closed) {
      this.channel.close().catch(() => {});
    }
    if (!this.amqp.closed) {
      this.amqp.close().catch(() => {});
    }
    this.amqp = this.setAmqpClient();
    this.channel = undefined;
    this.consumer = undefined;
  }

  public disconnect(): void {
    if (this.socketSubscription$) this.socketSubscription$.unsubscribe();
    this.cleanupConnection();
  }

  public sendMessage(msg: Omit<WebsocketMessage, 'deviceId'>): void {
    if (this.amqp.closed) {
      console.error('Cannot send message, WebSocket is closed');
      this.amqp = this.setAmqpClient();
    }
    this.socketSubject$.next(this.encode({ ...msg, deviceId: this.deviceId }));
  }

  private setAmqpClient(): AMQPWebSocketClient {
    return new AMQPWebSocketClient(
      this.url,
      'websockets',
      'kds_service',
      '8X3Jkx-y=#,27yR',
      undefined,
      4096,
      30,
    );
  }

  private async start(retryCount = 0): Promise<void> {
    this.amqp.onerror = (err) => this.handleError(err, retryCount);
    try {
      const conn = await this.amqp.connect();
      this.channel = await conn.channel();
      this.channel.onerror = (err) => this.handleError(err, retryCount);

      // Chain operations sequentially
      const q = await this.channel.queue('');
      await this.channel.prefetch(1);
      await q.bind(this.exchange, this.queue);

      this.consumer = await q.subscribe({ noAck: false }, (msg) => {
        const parsedMsg = this.decode(
          msg.bodyToString() ?? '',
        ) as WebsocketMessage;
        if (parsedMsg['deviceId'] !== this.deviceId) {
          this.messagesSubject$.next(parsedMsg);
        }
        msg.ack();
      });

      // Attach publish after setting up the consumer
      await this.attachPublish(this.channel, retryCount);
    } catch (err) {
      this.handleError(err, retryCount);
    }
  }

  private async attachPublish(
    ch: AMQPChannel,
    retryCount: number,
  ): Promise<void> {
    this.socketSubscription$ = this.socketSubject$
      .pipe(
        concatMap(async (msg) => {
          if (this.amqp.closed || ch.closed) {
            throw new Error('AMQP connection or channel is closed');
          }
          await ch.basicPublish(this.exchange, this.queue, msg, {
            contentType: 'application/json',
            expiration: '5000',
          });
        }),
        catchError((err: unknown) => {
          this.handleError(err, retryCount);
          return EMPTY;
        }),
      )
      .subscribe();
  }

  private decode = (msg: string): unknown => JSON.parse(msg);

  private encode = (obj: unknown): string => JSON.stringify(obj);

  private handleError(err: unknown, retryCount: number): void {
    if (this.amqp.closed) {
      this.amqp = this.setAmqpClient();
    }
    console.error('Error', err, 'reconnecting in 1s');
    if (retryCount < this.maxRetryAttempts) {
      setTimeout(() => {
        this.start(retryCount + 1);
      }, 1000);
    } else {
      this.ngrxStore.dispatch(reportWebSocketError());
    }
  }
}
