Search code examples
javascriptnode.jsasync-awaitrabbitmqnode-amqp

How to get all messages using method consume in lib amqp.node?


Greeting guys.
Can you help me with asynchronnous in node.js?

Problem this:

I'm use amqplib module for work with RabbitMQ and here there method consume, who gives messages from RabbitMQ, but that method first return promise about he starts and after this promise starts, he call callbacks to get data from RabbitMQ, and i dont know how to catch when all messages will be send to my node js app.

For more explain, here my code and at end code at comments i wrote what i want:

/**
 * Here my test code
 *
 * requirng amqp.node lib
 */
let amqp = require('amqplib')
  , configConnection = { /* ..config options */ }
  , queue = 'users'
  , exchange = 'users.exchange'
  , type = 'fanout'

/**
 * declare annonymous function as async and immediately called it
 */
(async () => {
  /**
   * declare connection and channel with using async/await construction
   * who support version node.js >= 8.5.0
   */
  let conn = await amqp.connect(configConnection)
  let channel = await conn.createChannel()
  await channel.assertExchange(exchange, type)
  let response = await channel.assertQueue(queue)
  /**
   * response: { queue: 'users', messageCount: 10, consumerCount: 0 }
   */
  response = await channel.bindQueue(response.queue, exchange, '')
  response = await channel.consume(response.queue, logMessage, {noAck: false})
  /**
   * {noAck: false} false for not expect an acknowledgement
   */
  console.log('reading for query finish')

  function logMessage(msg) {
    console.log("[*] recieved: '%s'", msg.content.toString())
  }
})()
  /**
   * output will show:
   * reading for query finish
   * [*] recieved: 'message content'
   * [*] recieved: 'message content'
   * [*] recieved: 'message content'
   * ...
   *
   * But i'm need show message 'reading for query finish' after when
   * all consumes will executed
   *
   * Ask: How i can do this?
   */

Solution

  • I found answer on my question here.

    Answer in use: EventEmitter && Promise

    magic (for me) is here:
    await new Promise(resolve => eventEmitter.once('consumeDone', resolve))

    So ended code is:

    /**
     * Here my test code
     *
     * requirng amqp.node lib
     */
    let amqp = require('amqplib')
      , EventEmitter = require('events')
      , eventEmitter = new EventEmitter()
      , timeout = 10000
      , configConnection = { /* ..config options */ }
      , queue = 'users'
      , exchange = 'users.exchange'
      , type = 'fanout'
    
    /**
     * declare annonymous function as async and immediately called it
     */
    (async () => {
      /**
       * declare connection and channel with using async/await construction
       * who support version node.js >= 8.5.0
       */
      let conn = await amqp.connect(configConnection)
      let channel = await conn.createChannel()
      await channel.assertExchange(exchange, type)
      let response = await channel.assertQueue(queue)
      /**
       * response: { queue: 'users', messageCount: 10, consumerCount: 0 }
       */
      let messageCount = response.messageCount
      response = await channel.bindQueue(response.queue, exchange, '')
      response = await channel.consume(response.queue, logMessage(messageCount), {noAck: false})
      /**
       * {noAck: false} false for not expect an acknowledgement
       */
    
      /**
       * declare timeout if we have problems with emit event in consume
       * we waiting when event will be emit once 'consumeDone' and promise gain resolve
       * so we can go to the next step
       */
      setTimeout(() => eventEmitter.emit('consumeDone'), timeout)
      await new Promise(resolve => eventEmitter.once('consumeDone', resolve))
      console.log('reading for query finish')
    
      function logMessage(messageCount) {
        return msg => {
          console.log("[*] recieved: '%s'", msg.content.toString())
          if (messageCount == msg.fields.deliveryTag) {
            eventEmitter.emit('consumeDone')
          }
        }
    
      }
    })()