Search code examples
node.jsamazon-web-servicessocket.iorabbitmqnode-amqp

amqp.node won't detect a connection drop


We have a node.js script running a socket.io server whose clients consume messages from a RabbitMQ queue. We've recently migrated to Amazon AWS and RabbitMQ is now a cluster of two machines (redundant instances). The AMQP connection is lost from time to time (it is a limitation that arrives from a high availability environment with redundant VMs and we have to cope with it) and if an attempt to reconnect is made, the DNS chooses which instance to connect to (it is a cluster with data replication so it doesn't matter which instance to connect to).

The problem is that the attempt to reconnect is never made; after a while, when the connection is lost, amqp.node apparently fails to notice that the connection has been lost. Also, the consumers stop receiving messages and the socket.io server simply stops accepting new connections.

We have a 55 seconds heartbeat timeout (not to be confused with the socket.io heartbeat timeout) set at the RabbitMQ URL and are checking for 'error' and 'close' events with amqp.node's callback API but they are apparently never issued. The queues expect the consumed messages to be ack'ed. We want the node script to detect a lost connection and finish itself, so the environment will automatically start a new process and establish a connection again.

Here is the code, maybe we are doing something wrong with the amqp.node callback API or something else.

var express = require('express');
app = express();
var http = require('http');
var serverio = http.createServer(app);
var io = require('socket.io').listen(serverio, { log: false });
var socket;
var allcli = [];
var red, blue, green, magenta, reset;
red   = '\033[31m';
blue  = '\033[34m';
green  = '\033[32m';
magenta  = '\033[35m';
orange = '\033[43m';
reset = '\033[0m';

var queue = 'ha.atualizacao_mobile';
var urlRabbit = 'amqp://login:password@host?heartbeat=55' // Amazon
var amqp = require('amqplib/callback_api');
var debug = true;

console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds.");
io.set('heartbeat interval', 10 * 60);
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients.");

console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds.");
io.set('heartbeat timeout', 11 * 60);
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds.");


io.sockets.on('connection', function(socket){

    socket.on('error', function (exc) {
        console.log(orange+"Ignoring exception: " + exc + reset);
    });

    socket.on('send-indice', function (data) {
        // Some business logic
    });

    socket.on('disconnect', function () {
        // Some business logic
    });

}); 

function updatecli(data){
    // Some business logic
}

amqp.connect(urlRabbit, null, function(err, conn) {
    if (err !== null) {
        return console.log("Error creating connection: " + err);
    }

    conn.on('error', function(err) {
        console.log("Generated event 'error': " + err);
    });

    conn.on('close', function() {
        console.log("Connection closed.");
        process.exit();
    });

    processRabbitConnection(conn, function() {
        conn.close();
    });
});

function processRabbitConnection(conn, finalize) {
    conn.createChannel(function(err, channel) {

        if (err != null) {
            console.log("Error creating channel: " + err);
            return finalize();
        }

        channel.assertQueue(queue, null, function(err, ok) {
            if (err !== null) {
                    console.log("Error asserting queue " + queue + ": " + err);
                    return finalize();
            }

            channel.consume(queue, function (msg) {
                if (msg !== null) {
                    try {
                        var dataObj = JSON.parse(msg.content);
                        if (debug == true) {
                            //console.log(dataObj);
                        }
                        updatecli(dataObj);
                    } catch(err) {
                        console.log("Error in JSON: " + err);
                    }
                    channel.ack(msg);
                }
            }, null, function(err, ok) {
                if (err !== null) {
                    console.log("Error consuming message: " + err);
                    return finalize();
                }
            });
        });
    });
}

serverio.listen(9128, function () {
  console.log('Server: Socket IO Online  - Port: 9128 - ' + new Date());
});

Solution

  • Apparently the issue has been solved. The near 60 seconds heartbeat was the issue. It conflicts with the RabbitMQ load balancer which checks every 1 minute or so whether data has passed through the connection or not (if no data has passed, it breaks the connection). The AMQP connection stops receiving messages and the library apparently doesn't react to that. A lower heartbeat (e.g. 30 seconds) is necessary in order to avoid this situation.