Search code examples
javanetwork-programmingnettynio

How to pass data between Channels in netty?


I am writing a simple routing application. The idea is that I have servers or source nodes that receive transient clients connections that last for a period of x time. The messages received are decoded and then sent to a corresponding sink node or client that is/are already open depending on the details of the message. The Router class registers all channels and attemps to save them in maps so that it can filter and worj out the destination of the message. Once I get the destination, I should then be able to pick the actual sink node (could be of transient of persistent nature depending on the configurations) and send data to that channel wait for a response and then send it back to the originator. I'd like to know first if my implementation using netty is in the right direction ? and how can I pass a message received from any of the servers and send it to any of the clients and respond back to the originating source node ?

Below is my source code : It will / should give you an idea of what I am up to :Kindly use code examples in your explanation .

    import java.net.InetSocketAddress;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import org.jboss.netty.bootstrap.ClientBootstrap;
    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.ChannelFactory;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ChildChannelStateEvent;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.MessageEvent;
    import org.jboss.netty.channel.SimpleChannelHandler;
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

    /*
     * @author Kimathi
     */

    public class Service {

        private Nodes nodes;

        public void start(){

            nodes = new Nodes();
            nodes.addSourceNodes(new SourceNodes()).
                  addSinkNodes(new SinkNodes()).
                  addConfigurations(new Configurations()).
                  boot();
        }

        public void stop(){

            nodes.stop();
        }

        public static void main(String [] args){

            new Service().start();
        }

    }

    class Nodes {

       private SourceNodes sourcenodes;

       private SinkNodes sinknodes ;

       private Configurations configurations;

       public Nodes addConfigurations(Configurations configurations){

           this.configurations = configurations;

           return this;
       }

       public Nodes addSourceNodes(SourceNodes sourcenodes){

           this.sourcenodes = sourcenodes;

           return this;
       }

       public Nodes addSinkNodes(SinkNodes sinknodes){

           this.sinknodes = sinknodes;

           return this;
       }

       public void boot(){

          Router router = new Router(configurations);

          sourcenodes.addPort(8000).
                      addPort(8001).
                      addPort(8002);
          sourcenodes.addRouter(router);
          sourcenodes.boot() ;

          sinknodes.addRemoteAddress("127.0.0.1", 6000).
                    addRemoteAddress("127.0.0.1", 6001).
                    addRemoteAddress("127.0.0.1", 6002);
          sinknodes.addRouter(router);
          sinknodes.boot();

       }

       public void stop(){

           sourcenodes.stop();

           sinknodes.stop();
       }

    } 

    final class SourceNodes implements Bootable , Routable {

        private List <Integer> ports = new ArrayList();

        private ServerBootstrap serverbootstrap;

        private Router router;

        @Override
        public void addRouter(final Router router){

            this.router = router;
        }

        public SourceNodes addPort(int port){

            this.ports.add(port);

            return this;
        }

        @Override
        public void boot(){

            this.initBootStrap();

            this.serverbootstrap.setOption("child.tcpNoDelay", true);
            this.serverbootstrap.setOption("child.keepAlive", true);
            this.serverbootstrap.setPipelineFactory(new ChannelPipelineFactory() {

                @Override
                public ChannelPipeline getPipeline() throws Exception {

                    return Channels.pipeline(new SourceHandler(router));
                }
            });



            for(int port:this.ports){
                this.serverbootstrap.bind(new InetSocketAddress(port));
            }
        }

        @Override
        public void stop(){

            this.serverbootstrap.releaseExternalResources();

        }

        private void initBootStrap(){

            ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(),Executors.newCachedThreadPool());

            this.serverbootstrap = new ServerBootstrap(factory);
        }
    }

    final class SinkNodes implements Bootable , Routable {

        private List<SinkAddress> addresses= new ArrayList();

        private ClientBootstrap clientbootstrap;

        private Router router;

        @Override
        public void addRouter(final Router router){

            this.router = router;

        }

        public SinkNodes addRemoteAddress(String hostAddress,int port){

            this.addresses.add(new SinkAddress(hostAddress,port));

            return this;
        }

        @Override
        public void boot(){

            this.initBootStrap();

            this.clientbootstrap.setOption("tcpNoDelay", true);
            this.clientbootstrap.setOption("keepAlive", true);
            this.clientbootstrap.setPipelineFactory(new ChannelPipelineFactory() {

                @Override
                public ChannelPipeline getPipeline() throws Exception {

                    return Channels.pipeline(new SinkHandler(router));
                }
            });

            for(SinkAddress address:this.addresses){

                this.clientbootstrap.connect(new InetSocketAddress(address.hostAddress(),address.port()));
            }
        }

        @Override
        public void stop(){

            this.clientbootstrap.releaseExternalResources();
        }

        private void initBootStrap(){

            ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(),Executors.newCachedThreadPool());

            this.clientbootstrap = new ClientBootstrap(factory);
        }  

        private class SinkAddress {

            private final String hostAddress;
            private final int port;

            public SinkAddress(String hostAddress, int port) {
                this.hostAddress   = hostAddress;
                this.port = port;
            }

            public String hostAddress()   { return this.hostAddress; }
            public int port() { return this.port; }
        }
    }

    class SourceHandler extends SimpleChannelHandler {

        private Router router;

        public SourceHandler(Router router){

            this.router = router;
        }

        @Override
        public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {

            System.out.println("child is opened");
        }

        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

            System.out.println("child is closed");
        }

        @Override
        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {


                System.out.println("Server is opened");

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

            System.out.println(e.getCause());
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {


            System.out.println("channel received message");

        }
    }

    class SinkHandler extends SimpleChannelHandler {

        private Router router;

        public SinkHandler(Router router){

            this.router = router;
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

            System.out.println("Channel is connected");
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

            System.out.println(e.getCause());
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

            System.out.println("channel received message");

        }
    }

    final class Router {

        private Configurations configurations;

        private Map sourcenodes = new HashMap();

        private Map Sinknodes = new HashMap();

        public Router(){}

        public Router(Configurations configurations){

            this.configurations = configurations;
        }

        public synchronized boolean submitSource(ChannelHandlerContext ctx , MessageEvent e){

            boolean responded = false;

            return responded;
        }

        public synchronized boolean submitSink(ChannelHandlerContext ctx , MessageEvent e){

            boolean responded = false;

            return responded;
        }
    }

    final class Configurations {

        public Configurations(){}
    }

    interface Bootable {

        public abstract void boot();

        public abstract void stop();
    }

    interface Routable {

        public abstract void addRouter(Router router);
    }

Solution

  • The idea seems reasonable.

    The source channel handler can just write to the corresponding sink channel, using Channel#write(...), and vice versa on the reply.

    Of course, you also need a way to correlate the source channel with the reply, and how that is best done depends an the nature of the protocol. The best alternative, if possible, is to somehow encode the source channel id in the message to the sink channel (and also in the reply, of course).

    If that is not possible, you will somehow have to maintain the correlation. A FIFO queue per sink channel may be appropriate if the replies are guaranteed to pair up with the sent requests.