Search code examples
javascriptnode.jsrabbitmqes6-promise

Can I wait for a process to complete when consuming RabbitMQ messages with Node.js?


I'm pretty new to Node.js and ES6, and this is just confusing me a little bit. I am trying to leave a process running, consuming messages from a RabbitMQ queue. It needs to be able to process the message (which takes about 30-60 seconds) before it grabs the next message. Currently, the code I have, it grabs all messages it can and then tries to fork the processes. When there are 3-5 messages in the queue, this is fine, but for 20, 50 or 100 messages, this causes the server to run out of memory.

I have tried making the .consume() callback function async and adding await to the message processing function. I have tried wrapping an await new Promise within the .consume() callback around processMessage. I have tried adding await to the line that calls channel.consume. Nothing changes the behavior.

#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
    const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
    const cluster = await amqp.connect(conn_str);
    const channel = await cluster.createChannel();
    await channel.assertQueue(queue,  { durable: durable, autoDelete: true });
    if (prefetch) {
        channel.prefetch(prefetch);
    }
    console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

    try {
        channel.consume(queue, message => {
            if (message !== null) {
                console.log(' [x] Received', message.content.toString());
                processMessage(message.content.toString());
                channel.ack(message);
                return null;
            } else {
                console.log(error, 'Queue is empty!')
                channel.reject(message);
            }
        }, {noAck: isNoAck});
    } catch (error) {
        console.log(error, 'Failed to consume messages from Queue!')
        cluster.close(); 
    }
}

exports.consumeFromQueue = consumeFromQueue;

As a sidenote, if I create an array of strings and loop through the strings, when I add await to the processMessage line, it waits to execute process (30-60 seconds) before processing the next string.

(async () => {
    for (let i=0; i<urls.length; i++) {
        await processMessage(urls[i]);
    }
})();

So I basically need something that functions like this, but with listening to the queue in RabbitMQ.


Solution

  • If you want to limit the number of messages being processed by a consumer at any given time, use channel.prefetch():

    The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged.

    That is, if you only want to be able process a single message at a time before moving on to the next, set channel.prefetch(1)