Search code examples
nestjsbullnestjsbull

How to remove a job from a queue in @nestjs/bull?


I have two processors the first one is called with a callback function and never call it, this way the job still waiting and do not close unless the timeout, the other process find the waiting job of the processor 1 and moveToCompleted, but for a time the queue do not process any else.

  @Process({ name: 'receive', concurrency: 10 })
  async recieve(job: Job<any>) {
    const unpacked = this.unpackerService.unpack(job.data.message);
    const id = `${unpacked.FORCEDID}`;
    const sendJob = await this.sendQueue.getJob(id);
    this.logger.debug(`received response for ${id}`);
    if (!!sendJob) {
      this.logger.debug(`job ${id} found, moving to complete`);
      console.log(await sendJob.getState());
      await sendJob.moveToCompleted(JSON.stringify(unpacked));
      console.log(await sendJob.getState());
      return;
    }
    this.logger.debug(`job ${id} not found, saving trace`);
  }
  @Process({ name: 'send', concurrency: 2 })
  async sendIso(job: Job<any>, cb) {
    this.logger.debug(`procesing job ${job.id}`);
    await this.connectorService.sendMessage(job.data.iso);
  }

I spect that the process named send only have 2 job at the same time but at the moment one of these jobs has finished, other one be processed


Solution

  • i found the error, when yo declare the process with a callback function and you never exec that callback when the job is moved to complete emit the event finished but the current job remain in stuck state, the solution is do not leave the callback open, a alternative may be subscribe to a event emited on the queue and then exec the callback