Search code examples
javanettynio

why netty can not continuously send message?


I wrote a server to send large number of message to all clients after connecting.

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    while (true){
       content = arrayblockqueue.poll()
       ctx.writeAndFlush(content+"\r\n");
    }
}

after sending thousands of message , the channel do not send message anymore. through debugging, I found that AbstractNioByteChannel.incompleteWrite was invoked and the selectionKey will be add SelectionKey.OP_WRITE when the network is congestion. After being set OP_WRITE , AbstractNioUnsafe.isFlushPending() will return true so the flush() can not be done indeed . How to let netty recover this situation ? Or I use netty in a wrong way ?


Solution

  • Your handler method is invoked directly from an I/O thread. Until your handler method returns, the I/O thread which called the handler method cannot perform any I/O, and that's why you are not seeing anything is written.

    Looking from your code, what you want is to get a message from a blocking queue and write it to a channel. Instead of using a blocking queue, you can just write to the channel. Almost all operations in Netty are thread safe. For example:

    public static void main(String[] args) throws Exception {
        ...
        Channel ch = ...;
        for (int i = 0; i < 1000000; i ++) {
            ch.writeAndFlush(String.valueOf(i) + "\r\n");
        }
        ...
    }
    
    // And your handler doesn't need an arrayblockingqueue.
    

    However, the code above will probably make the event queue of Netty grow infinitely, resulting OutOfMemoryError. To prevent the write requests from being queued infinitely, you have to use the future returned by the writeAndFlush() operation.

    for (int i = 0; i < 1000000; i ++) {
        ChannelFuture f = ch.writeAndFlush(String.valueOf(i) + "\r\n");
        if ((i + 1) % 100 == 0) {
            // Wait until the write request is actually finished
            // so that the event queue becomes empty.
            f.sync();  
        }
    }