Search code examples
node.jsrabbitmqnode-amqpnode-amqplib

Node RabbitMQ consume message and do something for each message


I want to consume messages from a rabbitmq service and for each message I receive I want to do something (Ex: Put that message in to a database, Process the message and send a reply via RabbitMq through another queue) per message.

Currently my RabbitMq consumer code is as follows:

const all = require('bluebird').all;
const basename = require('path').basename;


function receive() {
    const severities = process.argv.slice(2);
    if (severities.length < 1) {
        console.warn('Usage: %s [info] [warning] [error]',
            basename(process.argv[1]));
        process.exit(1);
    }
    let config = {
        protocol: 'amqp',
        hostname: 'localhost',
        port: 5672,
        username: 'rumesh',
        password: 'password',
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 0,
        vhost: '/',
    };
    amqp.connect(config).then(function (conn) {
        process.once('SIGINT', function () {
            conn.close();
        });
        return conn.createChannel().then(function (ch) {
            let queue = 'test';
            let exchange = 'test-exchange';
            let key = 'python-key';
            let exchange_type = 'direct';

            let ok = ch.assertExchange(exchange, exchange_type, {durable: true});

            ok = ok.then(function () {
                return ch.assertQueue(queue, { durable: true});
            });

            ok = ok.then(function (qok) {
                const queue = qok.queue;
                return all(severities.map(function (sev) {
                    ch.bindQueue(queue, exchange, sev,{durable: true});
                })).then(function () {
                    return queue;
                });
            });

            ok = ok.then(function (queue) {
                return ch.consume(queue, logMessage, {noAck: true});
            });
            return ok.then(function () {
                console.log(' [*] Waiting for logs. To exit press CTRL+C.');
            });

            function logMessage(msg) {
                console.log(" [x] %s:'%s'",
                    msg.fields.routingKey,
                    msg.content.toString());
            }
        });
    }).catch(console.warn);
}


module.exports = receive;```

Solution

  • I'd suggest you create a handler function like onNewMessage that gets called each time you get a new message on a queue.

    You can encode messages in lots of ways since you can send binary data via AMQP.

    JSON is definitely one way to send messages, this is very convenient to deal with in Node.js.

    Here's some sample code that connects to a server, then sends and receives messages:

    const amqp = require('amqplib');
    
    const queue = 'test';
    
    // Set your config here...
    let config = {
        protocol: 'amqp',
        hostname: 'localhost',
        port: 5672,
        username: 'rumesh',
        password: 'password',
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 0,
        vhost: '/',
    };
    
    
    async function start() {
        try {
            const conn = await createConnection(config);
            console.log("Connected to AMQP server.");
            let channel = await conn.createChannel();
            await channel.assertQueue(queue, { durable: true});
    
            startPollingForMessages(channel);
            startSendingMessages(channel);
        } catch (err) {
            console.error("start: Connection error:",err.message);
        }
    }
    
    async function createConnection(config) {
        const conn = await amqp.connect(config);
    
        conn.on("error", function(err) {
            console.error("Connection error:",err.message);
        });
    
        conn.on("close", function() {
            console.error("Connection closed:", err.message);
        });
    
        return conn;
    }
    
    function startSendingMessages(channel) {
        const SEND_INTERVAL = 5000;
        setInterval(() => { 
            sendMessage(channel, queue, JSON.stringify({ timestamp: new Date().toISOString(), message: " Some message" })); 
        }, SEND_INTERVAL);
    }
    
    async function sendMessage(channel, queue, messageContent) {
        console.log(`sendMessage: sending message: ${messageContent}...`);
        return channel.sendToQueue(queue, Buffer.from(messageContent))
    }
    
    function startPollingForMessages(ch) {
        ch.consume(queue, (msg) => {
            onNewMessage(msg);
            ch.ack(msg);
        });
    }
    
    function onNewMessage(msg) {
        // Do your database stuff or whatever here....
        console.log("On new message:", msg.content.toString())
    }
    
    start();