Search code examples
javamultithreadingnettyscheduledexecutorservice

Netty and Scheduled Executor Service


I'm trying to create a TCP server that read data periodically from a database (Redis) and send it to the appropriate client.

However, since I'm pretty new to Netty, I don't know how could I schedule this. I do know that I need to use a Scheduled Executor Service like this:

ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.scheduleAtFixedRate(() -> {
    System.out.println("Calling...");
    // Do something
}, 1, 1, TimeUnit.SECONDS);

However, when I tried to put that in the server code, It's only calling the method once. I've tried to put that in different place but still can't seem to get it right. What should I do?

Here's the code of the server:

package com.example.test.app;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Server {

    public static void main(String[] args) throws Exception
    {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        final ServerHandler handler = new ServerHandler();

        try {

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception
                {
                    ch.pipeline().addLast(handler);
                }

            });
            b.option(ChannelOption.SO_BACKLOG, 128);
            b.childOption(ChannelOption.SO_KEEPALIVE, true);

            ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
            e.scheduleAtFixedRate(() -> {
                System.out.println("Calling...");
                handler.saySomething();
            }, 1, 1, TimeUnit.SECONDS);

            ChannelFuture f = b.bind(1337).sync();
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

And here's the server handler:

package com.example.test.app;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    private ChannelHandlerContext ctx;

    @Override
    public void channelActive(ChannelHandlerContext ctx)
    {
        this.ctx = ctx;
        System.out.println("Someone's connedted!");
    }

    public void saySomething()
    {
        final ChannelFuture f = ctx.writeAndFlush("Sup!");
        f.addListener((ChannelFutureListener) (ChannelFuture future) -> {
            System.out.println("Something has been said!");
        });
    }

}

Solution

  • The method saySomething() generates NullPointerException for calling final ChannelFuture f = ctx.writeAndFlush("Sup!"); while ctx is null. EventExecutorGroup.scheduleAtFixedRate javadoc description says that "If any execution of the task encounters an exception, subsequent executions are suppressed". So this is why you get is called only once...

    Also, seems like Netty allows you to re-use a handler instance for different pipeline instances only if you annotate this handler's class as @Sharable. Otherwise, it will throw exception. If your handler is stateless (which is not your case, as yours has the ctx member) then you should annotate it as @Sharable and re-use it to all created pipelines. If it is stateful, create a new instance for every new pipeline (new client connection).

    Finally, to schedule your task for each connected client you can use the executor which can be referenced by the ctx of the connected client's channel (by default, as in your case, the channel's EventLoop) on your channelActive() implementation. This executor implements ScheduledExecutorService, so you have also scheduleAtFixedRate. Take a look at my version of your code and see if it suits you.

    Server:

    package com.example.test.app;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class Server {
    
        public static void main(String[] args) throws Exception
        {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
    
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup);
                b.channel(NioServerSocketChannel.class);
                b.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception
                    {
                        ch.pipeline().addLast(new ServerHandler());
                    }
    
                });
                b.option(ChannelOption.SO_BACKLOG, 128);
                b.childOption(ChannelOption.SO_KEEPALIVE, true);
    
    //            ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
    //            e.scheduleAtFixedRate(() -> {
    //                System.out.println("Calling...");
    //                handler.saySomething();
    //            }, 1, 1, TimeUnit.SECONDS);
    
                ChannelFuture f = b.bind(1337).sync();
                f.channel().closeFuture().sync();
    
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
    }
    

    ServerHandler:

    package com.example.test.app;
    
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.concurrent.ScheduledFuture;
    
    import java.util.concurrent.TimeUnit;
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        private ScheduledFuture sf;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx)
        {
            System.out.println("Someone's connedted! "+ctx.channel());
            sf = ctx.executor().scheduleAtFixedRate(() -> {
                System.out.println("Calling...");
                saySomething(ctx);
            }, 1, 1, TimeUnit.SECONDS);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("Someone's disconnected! "+ctx.channel());
            sf.cancel(false);
        }
    
        private void saySomething(ChannelHandlerContext ctx)
        {
                final ChannelFuture f = ctx.writeAndFlush("Sup!");
                f.addListener((ChannelFutureListener) (ChannelFuture future) -> {
                    System.out.println("Something has been said!");
                });
        }
    
    }