Search code examples
node.jsrabbitmqamqpnode-amqp

AMQPlib nodejs consumer task concurrency


I'm building a background task management system with rabbitmq and nodejs using the amqlib module.

Some of the tasks are really CPU-consuming, so if I'm launching a lot of them and I have only a few workers up, my server can get killed (using too much CPU).

I'm wondering if there is a way to create an amqp queue so that my consumers will only consume one task of this queue at a time (i.e. Before an ack or a reject, do not send a task of this kind to this consumer). Or should I handle this myself in the code (maybe keeping a reference in my worker that I'm handling a task of this queue and rejecting all tasks of this queue while I'm executing the task ?).

Here is my sample code :

I'm creating the amqp connection like that

const amqpConn = require('amqplib').connect('amqp://localhost');

My queue name is tasks :

amqpConn.then((conn) => {
  return conn.createChannel();
}).then((ch) => {
  return ch.assertQueue('tasks').then((ok) => {
    ch.sendToQueue(q, new Buffer(`something to do ${i}`));
  });
}).catch(console.warn);

And here is my consumer (I guess this is where I should do the work to limit only one concurrent task of this queue) :

amqpConn.then((conn) => {
  return conn.createChannel();
}).then((ch) => {
  return ch.assertQueue('tasks').then((ok) => {
    return ch.consume('tasks', (msg) => {
      if (msg !== null) {
        console.log(msg.content.toString());
        ch.ack(msg);
      }
    });
  });
}).catch(console.warn);

Thanks a lot !


Solution

  • I think I got it going by :

    • creating a Channel per queue
    • using the prefetch_count of the channel to limit the concurrency on a per-consumer basis

    https://www.rabbitmq.com/consumer-prefetch.html