import { Injectable } from '@angular/core';

import { Observable, Subject, Subscriber } from 'rxjs';

import { Bg2Socket } from 'app/core/bg2socket.service';

import { environment } from 'environments/environment';

import { Entity } from 'app/models';
import { Dictionary } from 'app/typings/core/interfaces';
import { RunSnapshotInterface } from '../run/models/RunSnapshot';

export interface StreamNotification {
  id?: number;
  entity?: any;
  no_state?: boolean;
  store_replay?: boolean;
  run?: RunSnapshotInterface;
  inconsistent_from_date?: any;
  inconsistent_from_event?: any;
  entity_state?: Dictionary<any>;
  invalidate_bg2_cache?: boolean;
  action: 'state_update' | 'update' | 'invalidate';
}

@Injectable({
  providedIn: 'root'
})
export class Beeguard2Stream {
  // #region -> (constants)

  private readonly ENTITY_ROOM_PREFIX = '_entity_';

  private _is_stream_enabled = environment.activateAsyncStream;

  // #endregion

  // #region -> (service basics)

  private _reconnect$ = new Subject<Set<string>>();
  public reconnect$ = this._reconnect$.asObservable();

  private rooms: Set<string> = new Set<string>();
  private rooms_observers: { [room: string]: Subscriber<any> } = {};

  constructor(private _socket: Bg2Socket) {}

  // #endregion

  private connect(): void {
    if (this._is_stream_enabled && this._socket === null) {
      console.log('[SIO] Connecting to socketio... (' + this._socket.ioSocket.id + ')');

      this._socket.on('connect', () => {
        console.log('[SIO] connected');

        this.rooms.forEach(room => {
          console.log(`[SIO] (re)join ${room}`);

          this._socket.emit('join', { room }, (ack: any) => {
            // Replay last notification if available
            if (ack.last && this.rooms_observers[room]) {
              console.log(room, 'Get last notification', ack);
              this.rooms_observers[room].next(ack.last);
            }
          });
        });
      });

      this._socket.on('disconnect', () => {
        console.log('[SIO] disconnected');
      });

      this._socket.on('reconnect', () => {
        this._reconnect$.next(this.rooms);
        console.log('[SIO] reconnect');
      });
    }
  }

  public subscribeToEntity(entity: Entity): Observable<StreamNotification> {
    if (!this._is_stream_enabled) {
      return new Observable(observer => {});
    }

    this.connect();

    const entity_id = entity.id;
    const entity_type = entity.type;
    const room = this.ENTITY_ROOM_PREFIX + entity_id;

    const observable = new Observable(observer => {
      // console.log(`[SIO] join ${room} (${etype})`);
      this.rooms.add(room);
      this.rooms_observers[room] = observer;
      this._socket.emit('join', { room }, (ack: any) => {
        // Replay last notification if available
        if (ack.last) {
          // console.log(entity.desc, 'ACK', ack);
          observer.next(ack.last);
        }
      });

      this._socket.on(room + '_update', (data: any) => {
        // console.log(entity.desc, 'Update', data);
        observer.next(data);
      });

      return () => {
        console.log(`[SIO] leave ${entity_type}`);
        this._socket.emit('leave', { room });
        this.rooms.delete(room);
        // delete this.rooms_observers[room];
      };
    });

    return observable as any;
  }
}
