Search code examples
javascriptnode.jsrabbitmqamqpphp-amqplib

How to use channel.assertQueue function from amqplib library for node.JS?


I am developing a messaging app using RabbitMQ, and Node.JS. I am using amqplib for this purpose. I am new to Node.JS and finding few difficulties in understanding the syntax of amqplib.. For e.g. there is a function for declaring queue, that is

channel.assertQueue([queue, [options, [function(err, ok) {...}]]]);

I have been referring This from last 2-3 days but still I am not clear about these -> err and ok. How to use these parameters?

An example would be much much appreciated.


Solution

  • The ampqlib github page has some examples on how to use the library, using either callbacks or promises.

    I copied their first example and added some comments to explain what's happening.

    Might be worth checking their tutorial examples as well, that follows the official RabbitMQ tutorials.

    var amqp = require('amqplib/callback_api');
    var q = 'tasks';
    
    // connects to rabbitmq
    amqp.connect('amqp://localhost', function(err, conn) {
        // this function will be called when the connection is created
        // `err` will contain the error object, if any errors occurred
        // `conn` will contain the connection object
    
        if (err != null) bail(err); // calls `bail` function if an error occurred when connecting
        consumer(conn); // creates a consumer
        publisher(conn); // creates a publisher
    });
    
    function bail(err) {
        console.error(err);
        process.exit(1);
    }
    
    // Publisher
    function publisher(conn) {
        conn.createChannel(on_open); // creates a channel and call `on_open` when done
        function on_open(err, ch) {
            // this function will be called when the channel is created
            // `err` will contain the error object, if any errors occurred
            // `ch` will contain the channel object
    
            if (err != null) bail(err); // calls `bail` function if an error occurred when creating the channel
            ch.assertQueue(q); // asserts the queue exists
            ch.sendToQueue(q, new Buffer('something to do')); // sends a message to the queue
        }
    }
    
    // Consumer
    function consumer(conn) {
        var ok = conn.createChannel(on_open); // creates a channel and call `on_open` when done
        function on_open(err, ch) {
            // this function will be called when the channel is created
            // `err` will contain the error object, if any errors occurred
            // `ch` will contain the channel object
    
            if (err != null) bail(err); // calls `bail` function if an error occurred when creating the channel
            ch.assertQueue(q); // asserts the queue exists
            ch.consume(q, function(msg) { //consumes the queue
                if (msg !== null) {
                    console.log(msg.content.toString()); // writes the received message to the console
                    ch.ack(msg); // acknowledge that the message was received
                }
            });
        }
    }