Search code examples
multithreadingasynchronousnettynio

Netty ChannelInboundHandlerAdapter async/multithreading


I'm having trouble grasping the concepts behind multithreading in netty, EventLoopGroup (MultithreadEventLoopGroup), MultithreadEventExecutorGroup, DefaultEventExecutorGroup

I am trying to understand how the server handles multiple clients simultaneously sending requests that will execute some business logic and CRUD operations that add to RTT. Below is my netty server code which works, but I am trying to understand exactly how it will work with concurrent users and multiple open channels.

I have a simple ServerBootstrap

@Component
@RequiredArgsConstructor
public class SocketServer {

    private final ContextAwareLogger logger;
    private final ServerInitializer serverInitializer;
    private final NioEventLoopGroup bossGroup;
    private final NioEventLoopGroup workerGroup;

    private Channel mainChannel;

    @PostConstruct
    public void start() {
        try {
            ServerBootstrap bootstrap = init();
            mainChannel = bootstrap.bind(8484).sync().channel(); // save the main channel so we can cleanly close it when app is shutdown
            logger.info("Netty server started...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void stop() throws InterruptedException {
        logger.info("Shutting down Netty server");
        bossGroup.shutdownGracefully().sync();
        workerGroup.shutdownGracefully().sync();
        mainChannel.closeFuture().sync();
        logger.info("Netty Server shutdown complete.");
    }

    private ServerBootstrap init() {
        return new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 5000)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(serverInitializer);
    }

}

ChannelInitializer:

@Component
@RequiredArgsConstructor
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    private final PacketDecoder packetDecoder;
    private final ServerHandler serverHandler;
    private final PacketEncoder packetEncoder;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline()
            .addLast("decoder", packetDecoder) // ByteArrayDecoder
            .addLast("encoder", packetEncoder) // ByteArrayEncoder
            .addLast("inbound", serverHandler); // ChannelInboundHandlerAdapter
    }

}

ChannelInboundHandlerAdapter:

@Component
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    private SomeService someService;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

       // contains db access
       byte[] accept = someService.validateClient(ctx.channel());

       ctx.channel().writeAndFlush(accept);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

       // may contain db access
       byte[] response = someService.processPacket(ctx.channel(), msg));

       ctx.channel().writeAndFlush(response);
    }
}

Now when a client connects, I understand that a new Channel will opened and the handlers will be reused. Requirement is each client request/response needs to process immediately without waiting for some other client's CRUD operations to finish.

Are my channelRead and channelActive, etc, async because I am using NioEventLoopGroup (ie will each client's channel operations be run independent of each other)?

If a single client sends multiple requests in series, are they guaranteed to be handled in the same order?

Do I need to specific DefaultEventExecutorGroup for my inbound handler? (https://stackoverflow.com/a/28305019/1738539)


Solution

  • You either would need to use a DefaultEventExecutorGroup for your ServerHandler or dispatch the validateClient(...) / processPacket(...) to your own ThreadPool. Failing todo so will cause the EventLoop thread to block and so no other IO can be processed for this EventLoop until the blocking operation completes.