Search code examples
javamultithreadingionettydata-synchronization

how to add ObjectDecoder to netty server


The server code is from the netty QOTM (Quote Of The Moment) example:

package net.bounceme.dur.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.util.logging.Logger;

public final class Server {

    private static final Logger log = Logger.getLogger(Server.class.getName());

    public static void main(String[] args) throws InterruptedException {
        MyProps p = new MyProps();
        int port = p.getServerPort();
        new Server().pingPong(port);
    }

    private void pingPong(int port) throws InterruptedException {
        log.fine("which handler?");
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new ServerDatagramHandler());
            b.bind(port).sync().channel().closeFuture().await();
        } finally {
            group.shutdownGracefully();
        }
    }
}

Here is the DatagramPacket handler:

package net.bounceme.dur.netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import java.util.Random;
import java.util.logging.Logger;

public class ServerDatagramHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    private static final Logger log = Logger.getLogger(ServerDatagramHandler.class.getName());
    private static final Random random = new Random();

    public ServerDatagramHandler() {
        log.info("..started..");
    }

    // Quotes from Mohandas K. Gandhi:
    private static final String[] quotes = {
        "Where there is love there is life.",
        "First they ignore you, then they laugh at you, then they fight you, then you win.",
        "Be the change you want to see in the world.",
        "The weak can never forgive. Forgiveness is the attribute of the strong.",};

    private static String nextQuote() {
        int quoteId;
        synchronized (random) {
            quoteId = random.nextInt(quotes.length);
        }
        return quotes[quoteId];
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
        System.err.println(packet);
        if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) {
            ctx.write(new DatagramPacket(
                    Unpooled.copiedBuffer("QOTM: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.severe(cause.toString());
    }
}

which I would like to switch to a Quote Handler:

package net.bounceme.dur.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Random;
import java.util.logging.Logger;
import net.bounceme.dur.jdbc.Quote;

public class ServerQuoteHandler extends SimpleChannelInboundHandler<Quote> {

    private static final Logger log = Logger.getLogger(ServerQuoteHandler.class.getName());
    private static final Random random = new Random();

    public ServerQuoteHandler() {
        log.info("..started..");
    }

    // Quotes from Mohandas K. Gandhi:
    private static final String[] quotes = {
        "Where there is love there is life.",
        "First they ignore you, then they laugh at you, then they fight you, then you win.",
        "Be the change you want to see in the world.",
        "The weak can never forgive. Forgiveness is the attribute of the strong.",};

    private static String nextQuote() {
        int quoteId;
        synchronized (random) {
            quoteId = random.nextInt(quotes.length);
        }
        return quotes[quoteId];
    }

    @Override
    protected void channelRead0(ChannelHandlerContext chc, Quote quote) throws Exception {
        log.info(quote.toString());
        chc.writeAndFlush(new Quote(nextQuote()));
    }

}

For my purposes, Quote is just a String wrapper, with a single field, and toString returns the quote. It implements Serializable, of course, and uses serialVersionUID.

When I look at the ping-pong example, I don't see where to add ObjectEncoder on the server.

A blog has this snippet:

 // Set up the pipeline factory.
 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  public ChannelPipeline getPipeline() throws Exception {
   return Channels.pipeline(
    new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
    new DateHandler()
   );
  };
 });

but, how do I implement that into the QOTM server? I'm going through Netty in Action, but haven't found the relevant text explaining this yet. Neither ObjectEncoder nor ObjectDecoder appear in the text of the book..?

see also:

How to send an object with Netty?


Solution

  • I add the encoder and decoder like this to send various POJOs:

    Client:

            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ObjectEncoder());
                ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                ch.pipeline().addLast(customHandler1);
                ch.pipeline().addLast(customHandler2);
                ch.pipeline().addLast(customHandler3);
            }
        });
    

    Server:

            bootstrap.option(ChannelOption.SO_REUSEADDR, true);
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                                ch.pipeline().addLast(new ObjectEncoder());
                                ch.pipeline().addLast(customHandler1);
                                ch.pipeline().addLast(customHandler2);
                                ch.pipeline().addLast(customHandler3);
                            }
                        });