import { EventEmitter } from 'events';
import { QueueStorage } from '../../interfaces/Storage';

export type Config = {
  cacheKey: string;
  delay: number;
  maxConcurrency: number;
};

export type Item<T> = {
  data: T;
  depth: number;
};

export default class PriorityQueue<T> extends EventEmitter {
  protected paused = false;

  protected storage: QueueStorage<string, T>;

  protected config: Config;

  protected maxConcurrency: number;

  protected pendingItems: T[] = [];

  protected resolveIdle: (() => void)[] = [];

  protected interval: NodeJS.Timeout | undefined;

  public constructor(config: Config, storage: QueueStorage<string, T>) {
    super();
    this.storage = storage;
    this.config = config;
    this.maxConcurrency = config.maxConcurrency;
  }

  public isPaused(): boolean {
    return this.paused;
  }

  public pause(): void {
    this.paused = true;
    this.unwatch();
    this.doResolveIdle();
  }

  public resume(): Promise<void> {
    if (!this.isPaused()) {
      return Promise.resolve();
    }
    this.paused = false;
    this.watch();
    return Promise.resolve();
  }

  public release(item: T): void {
    this.pendingItems = this.pendingItems.filter(
      (pendingItem) => pendingItem !== item
    );
  }

  public pending(): number {
    return this.pendingItems.length;
  }

  public async size(): Promise<number> {
    const size = await this.storage.size(this.config.cacheKey);
    return size + this.pending();
  }

  public end() {
    this.unwatch();
  }

  public onIdle(): Promise<void> {
    return new Promise((resolve) => {
      this.resolveIdle.push(resolve);
    });
  }

  protected doResolveIdle(): void {
    if (this.resolveIdle.length < 1) {
      return;
    }

    this.resolveIdle.forEach((resolve) => {
      resolve();
    });
  }

  public async push(item: T, priority: number): Promise<void> {
    this.release(item);
    await this.storage.enqueue(this.config.cacheKey, item, priority);
  }

  public async pull(): Promise<void> {
    if (this.isPaused()) {
      return;
    }

    if (this.pendingItems.length >= this.maxConcurrency) {
      return;
    }

    const item = await this.storage.dequeue(this.config.cacheKey);
    if (!item) {
      if (this.pendingItems.length < 1) {
        this.doResolveIdle();
        this.emit('empty');
      }
      return;
    }

    this.pendingItems.push(item[0]);
    this.emit('pull', item[0]);
  }

  public watch() {
    this.unwatch();
    this.interval = setInterval(() => {
      void this.pull();
    }, this.config.delay);
  }

  public unwatch() {
    this.interval && clearInterval(this.interval);
  }
}
