Search code examples
node.jsrabbitmqpublish-subscribeproducer-consumer

Worker blocked on consume - RabbitMQ - Node.js


I've got a problem with RabbitMQ in Node.js . I'm trying to implement a Pub/Sub connector in which each user has its own queue to poll to get the messages. When I publish a message via Postma and the user consumes it, I have no problem (I get the message correctly), while if the user tries to consume the message on the queue (but no messages are present), it gets stuck until a new publish is made, but I have no way to get the message. What I want to do is to abort the consume and re-try later. Could you help me with that?

get_queue works fine and gets the user's personal queue.

Route

app.post('/consume', (req, res) => {
        project.get_queue(req, res)
            .then(result1 => {return project.consume(req, res, result1.message);})
            .then(result2 => {res.status(result2.status).json({ message: result2.message });})
            .catch(err => res.status(err.status >= 100 && err.status < 600 ? err.status : 500).send({message: err.message}))
});

Controller

 exports.consume = function(req, res, user) {
        return new Promise((resolve,reject) => {
            const queue = user.queueName;
            amqpConn.createChannel(function(err, ch) {
                if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
                ch.on("error", function(err){
                    console.error("[AMQP] channel error", err.message);
                });
                ch.on("close", function() {
                    console.log("[AMQP] channel closed");
                });

                ch.assertQueue(queue, {durable: true},function(err, _ok){
                    if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
                    console.log("#Msg: ", _ok.messageCount)
                    if(_ok.messageCount === 0) {
                        reject({ status: 400, message: 'No consuming' });
                    }
                    else {
                    var json = [];

                    var topic = ch.consume(queue, function(msg, err) {
                        work(msg, function(ok) {
                    try {
                      if (ok) {
                        ch.ack(msg);
                        json.push(JSON.parse(msg.content.toString()));
                        resolve({ status: 200, message: json});
                            }
                      else {
                        ch.reject(msg, true);
                                resolve({ status: 200, message: json});
                            }
                    } catch (e) {
                            reject({ status: 404, message: "niente da consumare"});
                      closeOnErr(e);
                    }});
            }, {noAck: false}); 
        })
    })
};

This is what I did so far.

exports.consume = function(req, res, user) {
        return new Promise((resolve,reject) => {
            const queue = user.queueName;
            amqpConn.createChannel(function(err, ch) {
                if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
                ch.on("error", function(err){
                    console.error("[AMQP] channel error", err.message);
                });
                ch.on("close", function() {
                    console.log("[AMQP] channel closed");
                });

                var json = [];
                iterate(ch, queue, json);
            })
        })
    };

    function iterate(ch, queue, json) {
     ch.get(queue, {
         noAck: false
     }, function (err, msg) {
         if (!msg) return resolve({
             status: 200,
             message: json
         });
         work(msg, function (ok) {
                      console.log("Errore?");
                     console.log("MSG consumed: ", msg.content.toString());
             try {
                 if (ok) {
                     ch.ack(msg);
                     json.push(JSON.parse(msg.content.toString()));
                     return iterate(ch, queue, json);
                 }
                                 ch.reject(msg, true);
                 return resolve({
                     status: 200,
                     message: json
                 });
             } catch (e) {
                 reject({
                     status: 404,
                     message: "niente da consumare"
                 });
                 closeOnErr(e);
             }
     });
 })}

Solution

  • Using ch.get with an iterator :

    exports.consume = function(req, res, user) {
      return new Promise((resolve, reject) => {
          const queue = user.queueName;
          amqpConn.createChannel(function(err, ch) {
              if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
              ch.on("error", function(err){
                  console.error("[AMQP] channel error", err.message);
              });
              ch.on("close", function() {
                  console.log("[AMQP] channel closed");
              });
    
              var json = [];
              return iterate(ch, queue, json, resolve, reject);
          });
      });
    };
    
    function iterate(ch, queue, json, resolve, reject) {
      ch.get(queue, {
        noAck: false
      }, function (err, msg) {
        if (!msg) return resolve({
            status: 200,
            message: json
        });
        work(msg, function (ok) {
            console.log("Errore?");
            console.log("MSG consumed: ", msg.content.toString());
            try {
                if (ok) {
                    ch.ack(msg);
                    json.push(JSON.parse(msg.content.toString()));
                    return iterate(ch, queue, json, resolve, reject);
                }
                ch.reject(msg, true);
                return resolve({
                    status: 200,
                    message: json
                });
            } catch (e) {
                closeOnErr(e);
                return reject({
                    status: 404,
                    message: "niente da consumare"
                });
            }
        });
      });
    }
    

    It will run the function iterate(), if msg is false, it will resolve, else it will handle the message, if result from work is not ok, it will reject and stop iterating, if result is ok, it will iterate again;