Search code examples
node.jscallbacktimeoutrabbitmq

RabbitMQ request, always timesout


I have a weird problem where my callback is never published and the message goes to timeout, even though the method runs in the queue. This happens in some specific queues and after it happens once, i cannot make any other requests from client which even previously worked, they all timeout. Have to restart the client and sever to make it working again. This is the code, where its happening, and i cant seem to understand whats wrong.

Server.js file where i am creating the queues. I have several such queues, this is one of them.

var amqp = require('amqp');
var util = require('util');
var cnn = amqp.createConnection({host:'127.0.0.1'});
var getCart = require('./services/getCart');

cnn.on('ready', function() {

    cnn.queue('getCart_queue', function(q){
            q.subscribe(function(message, headers, deliveryInfo, m){

                // util.log(util.format( deliveryInfo.routingKey, message));
                // util.log("Message: "+JSON.stringify(message));
                // util.log("DeliveryInfo: "+JSON.stringify(deliveryInfo));

                getCart.handle_request(message, function(err,res){
                    cnn.publish(m.replyTo, res, {
                        contentType:'application/json',
                        contentEncoding:'utf-8',
                        correlationId:m.correlationId
                    });
                });
            });
        });
});

Here, the handle request function is completed successfully, but the callback never goes through and its always timeout on the other end

var cart = require('../models/cart');

function handle_request(msg, callback) {

    var user_id = msg.id;
    cart
        .find({id:user_id})
        .populate('users ads')
        .exec(function(err, results){

                       // This works, just the callback doesnt

            if(!err){
                console.log(results);
                callback(null, results);
            } else {
                console.log(err);
                callback(err, null);
            }

        });

}

exports.handle_request = handle_request;

this is how i am calling the request

 var msg_payload = {"id":id};
    mq_client.make_request('getCart_queue', msg_payload, function(err, results){
      console.log(results); // never prints 
       //stuff that is never reached
});

These are my rpc files, i dont think there should be anything wrong with these, as some other queues work fine. And this is the error shown on client

GET /getCart - - ms - -
Error: timeout 6ee0bd2a4b2ba1d8286e068b0f674d8f
    at Timeout.<anonymous> (E:\Ebay_client\rpc\amqprpc.js:32:18)
    at Timeout.ontimeout [as _onTimeout] (timers.js:341:34)
    at tryOnTimeout (timers.js:232:11)
    at Timer.listOnTimeout (timers.js:202:5)

Hope the information is not vague, if you need more, please let me know. Thanks!

I Think the error is in this file, because i tried debugging and from the rabbitmq server, the callback is being called and it has the correlation id as well as the reply to variable, so the request is not getting picked up here.

var amqp = require('amqp')
  , crypto = require('crypto');

var TIMEOUT=8000; 
var CONTENT_TYPE='application/json';
var CONTENT_ENCODING='utf-8';
var self;

exports = module.exports = AmqpRpc;

function AmqpRpc(connection){
  self = this;
  this.connection = connection; 
  this.requests = {}; 
  this.response_queue = false; 
}


AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){

  self = this;
  var correlationId = crypto.randomBytes(16).toString('hex');


  var tId = setTimeout(function(corr_id){

    callback(new Error("timeout " + corr_id));
    delete self.requests[corr_id];
  }, TIMEOUT, correlationId);

  var entry = {
    callback:callback,
    timeout: tId 
  };

  self.requests[correlationId]=entry;


    self.setupResponseQueue(function(){


    self.connection.publish(queue_name, content, {
      correlationId:correlationId,
      contentType:CONTENT_TYPE,
      contentEncoding:CONTENT_ENCODING,
      replyTo:self.response_queue});
  });
};


AmqpRpc.prototype.setupResponseQueue = function(next){

  if(this.response_queue) return next();

  self = this;

  self.connection.queue('', {exclusive:true}, function(q){  

    self.response_queue = q.name;

    q.subscribe(function(message, headers, deliveryInfo, m){
      var correlationId = m.correlationId;

      if(correlationId in self.requests){

        var entry = self.requests[correlationId];
        clearTimeout(entry.timeout);
        delete self.requests[correlationId];

        entry.callback(null, message);
      }
    });
    return next();    
  });
};

Solution

  • There wasn't a problem with rabbitMQ but with my queries in the handle request and after responding to the request.

    For others coming with this problem, check and double check every statement, as the error will not show in the console, but will only show a timeout