Search code examples
javatcpproject-reactortcpserverreactor-netty

Netty TCP Server only receiving 536 bytes from client. Remaining is truncated


I wrote a TCP Server application using project reactor netty. It's a simple application that receives a byte array request message from a client, processes it, then returns a byte array response message to the client.

I am running into an issue where no matter how much data the client actually sends, my TCP Server is only reading in no more than 536 bytes of data. The rest is truncated. The 1st 4 bytes of the client's byte[] message indicates how much data was sent. That length is always more than the actual data I'm receiving from netty in my handler. This is where things get interesting.

I was initially able to reproduce the issue my client was having by connecting and sending data to the TCP Server app from my local PC using a test client I wrote. I then fixed the issue by adding the SO_RCVBUF/SO_SNDBUF childoptions to my TCP Server and configuring them to be larger (4096 bytes). I can now see the full message is being received and processed by my TCP server. However, my client's message is still being truncated even after adding these options.

    TcpServer tcpServer = TcpServer.create();
    Optional.of(someTcpServerConfigObject)
            .filter(config -> config.isLoopResourcesEnabled())
            .ifPresent(enabled -> {
                LoopResources loopResources = LoopResources.create ("prefix", 1, 4,true);
                tcpServer.runOn(loopResources);
            });
    DisposableServer someTcpServer = tcpServer
            .host("12.123.456.789")
            .wiretap(true)
            .doOnBind(server -> log.info("Starting listener..."))
            .doOnBound(server -> log.info("Listener started on host: {}, port: {}", server.host(), server.port()))
            .port(12345)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .option(ChannelOption.AUTO_CLOSE, false)
            .childOption(ChannelOption.TCP_NODELAY,true)
            .childOption(ChannelOption.AUTO_CLOSE,false)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.SO_RCVBUF, 4096)
            .childOption(ChannelOption.SO_SNDBUF, 4096)
            .doOnConnection(connection -> {
                InetSocketAddress socketAddress = (InetSocketAddress) connection.channel().remoteAddress();
                log.info("Client has connected. Host: {}, Port: {}",
                            socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
            })
            .doOnChannelInit((observer, channel, remoteAddress) ->
                    channel.pipeline()
                            .addFirst(new LoggingHandler(MyTcpServer.class))
                            .addFirst(new TcpServerHandler())
            )
            .handle((inbound, outbound) ->
                    inbound
                            .receive()
                            .asByteArray()
                            .flatMap(req -> processRequest(req))
                            //above processRequest() returns a java.nio.ByteBuffer
                            //doing rsp.array() to convert to byte[]
                            .flatMap(rsp -> outbound.sendByteArray(Flux.just(rsp.array()))
                            .doOnError(throwable -> log.error("Error processing the request: {}", throwable.getMessage(),throwable))
            ).bindNow();
    someTcpServer.onDispose().block();
}

And then below is the TcpServerHandler class that I added as a handler above.

@Slf4j
public class TcpServerHandler extends ChannelDuplexHandler {

    private final AtomicLong startTime = new AtomicLong(0L);
    private final AtomicLong endTime = new AtomicLong(0L);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        byte[] data = HexFormat.of().formatHex(ByteBufUtil.getBytes((ByteBuf) msg));
        InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        log.info("Receiving message from: Host: {}, Port: {}. Data: {}", socketAddress.getAddress().getHostAddress(),
                socketAddress.getPort(), data);
        byte[] byteArrayContainingFourByteLength = new byte[4];
        System.arraycopy(data, 0, byteArrayContainingFourByteLength, 0, 4);
        ByteBuffer wrapped = ByteBuffer.wrap(byteArrayContainingFourByteLength);
        short actualLength = wrapped.getShort();
        log.info("# of bytes received from netty: {}", data.length);
        log.info("# of bytes client actually sent: {}", actualLength);
        startTime.set(System.nanoTime());
        ctx.fireChannelRead(msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        endTime.set(System.nanoTime());
        log.info("Took {} ms to process", Duration.ofNanos(endTime.get() - startTime.get()).toMillis()))
        super.write(ctx, msg, promise);
    }
}

Please let me know if there are any other options/childoptions/handlers/etc. on my TCP Server I can configure to resolve this. Upon doing a bit of research I found this Wikipedia page regarding max segment size. There it also mentions the same 536 byte limit so I'm wondering if my client's issue is related and if I can configure this through netty in some way.


Solution

  • I ended up going with a custom solution. I mentioned in my question how the 1st 4 bytes of the client's message is a numeric value indicating the length of their message. I modified the channelRead() of our custom handler to check the incoming data netty read from the socket and see if that 4-byte length value matches the actual length of the data read. If it matches, this indicates I received the full message and to call ctx.channelFireRead() to begin processing. If not, I store the data read into a temporary bytebuffer, then wait for the next channelRead() to repeat the process again until all data is read. Code example below:

    public class TcpServerHandler extends ChannelDuplexHandler {
    
        ByteBuf byteBuf = Unpooled.buffer();
        private final AtomicInteger prefixLength = new AtomicInteger(0);
        private final AtomicInteger actualMessageLength = new AtomicInteger(0);
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //capture the data netty read from the socket
            byte[] message = ByteBufUtil.getBytes((ByteBuf) msg);
            
            //store the data into a temporary byte array buffer
            byteBuf.writeBytes(message);
    
            //increment the total message length
            actualMessageLength.getAndAdd(message.length);
    
            //extract the 1st 4 bytes of the message to get the length value
            if(prefixLength.get() == 0) {
                ByteBuffer wrapped = ByteBuffer.wrap(extractByteValueByIndex(0, 4, message);
                prefixLength.getAndSet(wrapped.getShort() + 4);
            }
    
            //did we receive all the data?
            if(actualMessageLength.shortValue() == prefixLength.shortValue()) {
                //Yes, reset the global variables to prepare for the next incoming request
                //then call ctx.fireChannelRead() and begin processing the message
                log.info("Received the entire message, begin processing...");
                actualMessageLength.set(0);
                prefixLength.set(0);
                ctx.fireChannelRead(byteBuf.array());
                byteBuf.clear();
            } else {
                //No, wait for the next channelRead() and repeat until all data is read
                log.info("Length mismatch on channel read. Actual: {}, Expected: {}, reading in more data...", actualMessageLength, prefixLength);
            }
        }
    
        private byte[] extractByteValueByIndex(int startIndex, int size, byte[] message){
            byte[] byteArray = new byte[size];
            System.arraycopy(message, startIndex, byteArray, 0, size);
            return byteArray;
        }
    }
    

    If this 536-byte limit really is related to the MSS (Maximum Segment Size), it would be nice if netty could expose this value as a channel option so we can configure it. I came across this Stack overflow question from 2017 of someone else asking for the same support.