Search code examples
javasocketsserializationloadnio

Java NIO SocketChannels: Serialization issue with multiple objects and high system load


I am using SocketChannels from java.nio to send objects between several peers in a p2p network. Each peer has a ServerSocketChannel where other peers can connect to. I am sending serialized objects over these SocketChannels. The base of my code is basically the NIO tutorial from http://rox-xmlrpc.sourceforge.net/niotut/

All message I send implement the same interface, so I can do the deserialization on the receiver side. The following code does that (Updated with byte counter, see below):

private void readKey(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer readBuffer = ByteBuffer.allocate(4);

    channel.read(readBuffer);
    readBuffer.rewind();
    int numBytes = readBuffer.getInt();

    readBuffer = ByteBuffer.allocate(numBytes);
    int read = channel.read(readBuffer);

    Message msg = Message.deserialize(readBuffer);  
    this.overlay.addIncomingMessage(msg);
}

The sending is done via a serialization of the object, the message is serialized and added to a queue, the interest ops for the channels are changed, and the serialized object is sent.

private void writeKey(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);

        // Write until there's not more data ...
        while (!queue.isEmpty()) {
            ByteBuffer buf = (ByteBuffer) queue.get(0);

                            // UPDATE for message length
            ByteBuffer len = ByteBuffer.allocate(4);
            len.putInt(buf.remaining());
            len.rewind();

            socketChannel.write(len);
            socketChannel.write(buf);
            if (buf.remaining() > 0) {
                // ... or the socket's buffer fills up
                break;
            }
            queue.remove(0);
        }

        if (queue.isEmpty()) {
            key.interestOps(SelectionKey.OP_READ);
        }
    }       
}

This all works fine, as long as the system load is low. When I start sending lots of messages, some of the messages that are written to the channel are not received. When I add delays to my code to slow things down, everything works as expected.

I think there may be a problem when multiple messages are sent before the receiver reads the channels' buffers to create objects, but I don't know how to solve this.

I appreciate any hints or ideas.

Regards, Christoph

UPDATE: After the first hint, I added the number of bytes transferred to the sending side and read only those number of bytes on the receiver, but no effect.


Solution

  • you seem to be assuming that the receiver will be receiving the message "packets" in the same chunks in which the sender is sending them. this is not going to be the case. the underlying socket may arbitrarily split/join the chunks you are sending.

    in order to implement a message based protocol like this, you need to manage the message chunks. you need to treat the socket like a stream of data when reading it, and not assume that the buffer of data received from a read call corresponds to a single message. one way to implement a message based protocol over a stream is to first write the message length, then write the message bytes. on the receiving end, you first read the message length, then only consume that many bytes when parsing the message.

    UPDATE:

    if you are still losing messages, i would guess that the problem is with your synchronization logic. it's hard to tell from the code you included, but how are the individual queues being synchronized (i only see a top-level lock on the pendingData list). also, what kind of synchronization are you using on the receiving end?