Search code examples
nettynetty4

Netty client auto reconnection


I have a Netty client with connect && auto reconnect implemented in the following manner. The code has been working fine for many years.

Bootstrap initialisation

Bootstrap bootstrap = new Bootstrap;

bootstrap.group(new NioEventLoopGroup(NUM_OF_WORKER_THREADS, new NamedThreadFactory(client.hostname+"-%d")))
bootstrap.channel(NioSocketChannel.class)
    .handler(new MyChannelInitializer(sslContext, client))
    .option(ChannelOption.SO_KEEPALIVE, true)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);

Actual Connection logic

public void connect() { 
    try {
        ChannelFuture cf = bootstrap.connect(hostname, port).sync().await();
    } catch (Exception e) {
        if (group != null && !group.isShutdown()) {
            logger.error("Shutting down Event loop group: {}, host: {}" , group, hostname);
            group.shutdownGracefully();
        }
        throw new Exception("Connection failed to " + hostname, e);
    }
}

ReConnect

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    super.channelUnregistered(ctx);
    ctx.channel().close();
    //Connect again
    connect();
}

The reconnect trigger is hooked in channel unregistered callback. Recently observed a strange issue where connection went into close_wait state and thread is stuck forever. Why connect timeout (3 sec) is not triggered?

Here is the thread dump for the specific callback processing.

I suspect the root cause here is due to invoking blocking methods (sync and await) in callback handler threads. is there any deadlock?

"server-callback-worker-1" #214 prio=5 os_prio=0 cpu=1322.64ms elapsed=83237.37s tid=0x00007f1b0c005800 nid=0x1a9e89 in Object.wait()  [0x00007f1b5c1fd000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait([email protected]/Native Method)
    - waiting on <no object reference available>
    at java.lang.Object.wait([email protected]/Object.java:328)
    at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
    - waiting to re-lock in wait() <0x0000000461b0aba8> (a io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:131)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:30)
    at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:405)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:119)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:30)
    at com.example.Client.connect(Client.java:113)
    at com.example.Client.reconnect(Client.java:141)
    at com.example.handler.ClientHandler.channelUnregistered(ClientHandler.java:72)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:219)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:188)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1388)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:215)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:821)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run([email protected]/Thread.java:829)

I tried refactoring the reconnect logic using a dedicated executor rather than processing in callback handler threads. is this correct approach? Do we need to use channel EventLoop to schedule the reconnect?

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    super.channelUnregistered(ctx);
    ctx.channel().close();
    client.setConnected(false);
    scheduledExecutorService.schedule(this::reconnectAttempt, 3, TimeUnit.SECONDS);
}

Solution

  • You should not call await() or sync() from threads in the event loop. This caused a dead lock to form in your case.

    Instead of calling await, add a listener to the promise returned by the bootstrap hat has your error handling code.