Search code examples
node.jstcploadbuffergarbage

Node js TCP server, socket.on('data') - data buffer contains garbage data on high load


i use the net server of node js and use the socket.on('data') function to receive data. To parse the TCP messages I use the parse buffer method. This uses the first 4 bytes as the length of the TCP message so that I can read from the TCP stream and form individual commands.In summary what happens at high load is that there is some garbage data returned as part of the TCP stream which causes problems.

function onConnect(client) {
var accumulatingBuffer = new Buffer(0);
var totalPacketLen = -1;
var accumulatingLen = 0;
var recvedThisTimeLen = 0;

client.on('data', function (data) {
    parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen);
});
}

and here is the parsebuffer method.

function parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen) {
recvedThisTimeLen = Buffer.byteLength(data);
var tmpBuffer = new Buffer(accumulatingLen + recvedThisTimeLen);
accumulatingBuffer.copy(tmpBuffer);
data.copy(tmpBuffer, accumulatingLen); // offset for accumulating
accumulatingBuffer = tmpBuffer;
tmpBuffer = null;
accumulatingLen = accumulatingLen + recvedThisTimeLen;

if (accumulatingLen < PACKETHEADERLEN) { 
    return;
} else if (accumulatingLen === PACKETHEADERLEN) { 
    packetHeaderLen
    return;
} else {
    //a packet info is available..
    if (totalPacketLen < 0) {
        totalPacketLen = accumulatingBuffer.readUInt32BE(0);
    }
}
while (accumulatingLen >= totalPacketLen + PACKETHEADERLEN) {
    var aPacketBufExceptHeader = new Buffer(totalPacketLen); // a whole packet is available...
    accumulatingBuffer.copy(aPacketBufExceptHeader, 0, PACKETHEADERLEN, PACKETHEADERLEN + totalPacketLen);

    ////////////////////////////////////////////////////////////////////
    //process packet data
    var stringData = aPacketBufExceptHeader.toString();
    try {
        var JSONObject = JSON.parse(stringData);
        handler(client, JSONObject);


        var newBufRebuild = new Buffer(accumulatingBuffer.length - (totalPacketLen + PACKETHEADERLEN)); // we can reduce size of allocatin
        accumulatingBuffer.copy(newBufRebuild, 0, totalPacketLen + PACKETHEADERLEN, accumulatingBuffer.length);

        //init      
        accumulatingLen = accumulatingLen - (totalPacketLen + PACKETHEADERLEN); //totalPacketLen+4
        accumulatingBuffer = newBufRebuild;
        newBufRebuild = null;
        totalPacketLen = -1;

        //For a case in which multiple packets are transmitted at once.
        if (accumulatingLen <= PACKETHEADERLEN) {
            //need to get more data -> wait..
            return;
        } else {
            totalPacketLen = accumulatingBuffer.readUInt32BE(0);
        }
    } catch (ex) {
        console.log(ex + ' unable to process data');
        return;
    }
}
}

All is well until there is high simulated load using a bunch of clients sending messages fast. At that point of time inside the ParseBuffer method the first line "data.length"returns more than the length of the TCP data. This leads into the code reading garbage as UInt32BE which cause a very high value in totalpacketlength(which tells the next packets length). This leads to lost messages. Am I missing something. Please help.


Solution

  • When you do this in your parseBuffer() function:

    accumulatingBuffer = tmpBuffer;
    

    this is just assigning tmpBuffer to the function argument named accumulatingBuffer. It is NOT changing the accumulatingBuffer variable in your onConnect() method. As such, when you get a partial buffer, you lose the accumulated part. The same issue is true of the other arguments you are passing to parseBuffer(). Assigning to them inside parseBuffer() is not changing the variables of the same name in onConnect().

    There are probably simpler ways to write this, but the easiest way to keep with your same structure is to not pass individual variables, but to pass a single object that has those variables as properties on the object. Then, when you assign to the properties, you can get to those new values from within onConnect().

    The general structure would look like this:

    function onConnect(client) {
        var args = {};
        args.accumulatingBuffer = new Buffer(0);
        args.totalPacketLen = -1;
        args.accumulatingLen = 0;
        args.recvedThisTimeLen = 0;
    
        client.on('data', function (data) {
            parseBuffer(client, data, args);
        });
    }
    

    And, then make the corresponding changes in parseBuffer() to access the arguments as properties on the args object. Since objects are passed by pointer, when you assign to properties on the args object from within parseBuffer, those will be visible in args object in the onConnect method.


    FYI, I did not follow the entire logic elsewhere in the function so there could be other errors too. This code seems quite complex with a lot of buffer copies for what the fairly common tasks that it is trying to do. It's also the kind of code that has probably been written many times before and probably even exists in some pre-built libraries.