Search code examples
javaniosocketchannel

How to communicate between non-blocking client and non-blocking server through only one SocketChannel


I'm trying to write a non-blocking client and non-blocking server with requirements:

  • Server just listens to clients and send back to them what it has received
  • Client can send message to server at anytime and can do it many times, but by just only one SocketChannel.

I read this tutorial: http://rox-xmlrpc.sourceforge.net/niotut/index.html, and it implements the sever like these steps:

  1. Initialize a Selector
  2. Wait for an acceptable SelectionKey from client
  3. Then wait for a readable SelectionKey, and read data from it (when socketChannel.read return -1, close the socketChannel)
  4. After that send back to client by a writable SelectionKey

I took a look at this tutorial also: https://forums.oracle.com/forums/thread.jspa?threadID=1145909&tstart=2040, but it's too hard for me to understand

So I rewrite my own code based on ROX's tutorial. Here is my code http://www.mediafire.com?o30yvtp5kqpya8b (it's almost based on code of ROX's tutorial). Because it's hard to post all code here, so I upload my project to mediafire to let you can download it, please download it and you can import it to Eclipse to view code easily.

You can run: MyServer.java in package server, then MyClent.java in package client_test (don't care about package client)

After running server and client you will see server just can receive the first message from client, but it should be received 2 messages. I know there's something wrong with my implement, but I don't know why and how to fix it.

Any suggestions about fixing my code or about solution for my requirement will be appreciated, thank you all.

Ok, I will post the revelant parts of code here:

My Client:

MyClient.java

public class MyClient implements Runnable {
    // The host:port combination to connect to
    private InetAddress hostAddress;
    private int port;

    // The selector we'll be monitoring
    private Selector selector;

    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(8);

    // A list of PendingChange instances
    private List pendingChanges = new LinkedList();

    // Maps a SocketChannel to a list of ByteBuffer instances
    private Map pendingData = new HashMap();

    // Maps a SocketChannel to a RspHandler
    private Map rspHandlers = Collections.synchronizedMap(new HashMap());


    private SocketChannel socket;
    private static MyResponseHandler handler;   

    public MyClient(InetAddress hostAddress, int port) throws IOException {
        this.hostAddress = hostAddress;
        this.port = port;
        this.selector = this.initSelector();

        handler = new MyResponseHandler();      
    }

    public void send(byte[] data, MyResponseHandler handler) throws IOException {
        System.out.println("------------ send() ---- BEGIN");

        // Register the response handler
        this.rspHandlers.put(socket, handler);  

        // And queue the data we want written
        synchronized (this.pendingData) {
            List queue = (List) this.pendingData.get(socket);
            if (queue == null) {
                queue = new ArrayList();
                this.pendingData.put(socket, queue);
            }
            queue.add(ByteBuffer.wrap(data));
        }

        // Finally, wake up our selecting thread so it can make the required changes
        this.selector.wakeup();
        System.out.println("------------ send() ---- END");
    }

    public void run() {
        while (true) {
            System.out.println("------------ while in run() ---- BEGIN");
            try {
                // Process any pending changes
                synchronized (this.pendingChanges) {
                    Iterator changes = this.pendingChanges.iterator();
                    while (changes.hasNext()) {
                        System.out.println("CHANGE!!!!!!!!!!!!!!!!!");
                        ChangeRequest change = (ChangeRequest) changes.next();
                        switch (change.type) {
                        case ChangeRequest.CHANGEOPS:
                            SelectionKey key = change.socket.keyFor(this.selector);
                            key.interestOps(change.ops);
                            break;
                        case ChangeRequest.REGISTER:
                            change.socket.register(this.selector, change.ops);
                            break;
                        }
                    }
                    this.pendingChanges.clear();
                }

                // Wait for an event one of the registered channels
                this.selector.select();
                System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^");
                // Iterate over the set of keys for which events are available
                Iterator selectedKeys = this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    System.out.println("There's something in this while loop");
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();

                    if (!key.isValid()) {
                        System.out.println("key is invalid");
                        continue;
                    }

                    // Check what event is available and deal with it
                    if (key.isConnectable()) {
                        this.finishConnection(key);
                    } else if (key.isReadable()) {
                        this.read(key);
                    } else if (key.isWritable()) {
                        this.write(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            System.out.println("------------ while in run() ---- END");
        }
    }

    private void read(SelectionKey key) throws IOException {
        System.out.println("------------ read() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            // The remote forcibly closed the connection, cancel
            // the selection key and close the channel.
            key.cancel();
            socketChannel.close();
            return;
        }

        if (numRead == -1) {
            // Remote entity shut the socket down cleanly. Do the
            // same from our end and cancel the channel.
            key.channel().close();
            key.cancel();       
            return;
        }

        // Handle the response
        this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
        System.out.println("------------ read() ---- END");
    }

    private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
        System.out.println("------------ handleResponse() ---- BEGIN");
        // Make a correctly sized copy of the data before handing it
        // to the client
        byte[] rspData = new byte[numRead];
        System.arraycopy(data, 0, rspData, 0, numRead);

        // Look up the handler for this channel
        MyResponseHandler handler = (MyResponseHandler) this.rspHandlers.get(socketChannel);

        // And pass the response to it
        if (handler.handleResponse(rspData)) {
            // The handler has seen enough, close the connection
            socketChannel.close();
            socketChannel.keyFor(this.selector).cancel();
        }
        System.out.println("------------ handleResponse() ---- END");
    }

    private void write(SelectionKey key) throws IOException {
        System.out.println("------------ write() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        synchronized (this.pendingData) {
            List queue = (List) this.pendingData.get(socketChannel);

            // Write until there's not more data ...
            while (!queue.isEmpty()) {
                ByteBuffer buf = (ByteBuffer) queue.get(0);
                socketChannel.write(buf);
                if (buf.remaining() > 0) {
                    // ... or the socket's buffer fills up
                    break;
                }
                queue.remove(0);
            }

            if (queue.isEmpty()) {
                // We wrote away all data, so we're no longer interested
                // in writing on this socket. Switch back to waiting for
                // data.
                key.interestOps(SelectionKey.OP_READ);
            }
        }
        System.out.println("------------ write() ---- END");
    }

    private void finishConnection(SelectionKey key) throws IOException {
        System.out.println("------------ finishConnection() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Finish the connection. If the connection operation failed
        // this will raise an IOException.
        try {
            socketChannel.finishConnect();
        } catch (IOException e) {
            // Cancel the channel's registration with our selector
            System.out.println(e);
            key.cancel();
            return;
        }

        // Register an interest in writing on this channel
        key.interestOps(SelectionKey.OP_WRITE);
        System.out.println("------------ finishConnection() ---- END");
    }

    private void initiateConnection() throws IOException {
        System.out.println("------------ initiateConnection() ---- BEGIN");
        // Create a non-blocking socket channel
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);

        // Kick off connection establishment
        socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));

        // Queue a channel registration since the caller is not the 
        // selecting thread. As part of the registration we'll register
        // an interest in connection events. These are raised when a channel
        // is ready to complete connection establishment.
        synchronized(this.pendingChanges) {
            this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
        }

        System.out.println("------------ initiateConnection() ---- END");
        socket = socketChannel;
    }

    private Selector initSelector() throws IOException {
        // Create a new selector
        return SelectorProvider.provider().openSelector();
    }

    public static void main(String[] args) {
        try {
            MyClient client = new MyClient(InetAddress.getByName("127.0.0.1"),
                    9090);
            Thread t = new Thread(client);
            t.setDaemon(true);
            t.start();

            // Start a new connection
            client.initiateConnection();            

            // 1st
            client.send("hehe|||".getBytes(), handler);
            System.out.println("SEND: " + "hehe|||");
            handler.waitForResponse();

            System.out.println("==========================================================");

            // 2nd
            client.send(("2 hehe|||").getBytes(), handler);
            System.out.println("SEND: " + "2 hehe|||");
            handler.waitForResponse();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MyResponseHandler.java

public class MyResponseHandler {
    private byte[] rsp = null;

    public synchronized boolean handleResponse(byte[] rsp) {
        this.rsp = rsp;
        this.notify();
        return true;
    }

    public synchronized void waitForResponse() {
        while(this.rsp == null) {
            try {
                System.out.println("--waiting...");
                this.wait();
                System.out.println("--done!!!");
            } catch (InterruptedException e) {}
        }

        System.out.println("RECEIVE: " + new String(this.rsp));

        /**
         *  Set @rsp = null to let the block inside the above while loop
         *  will be run again 
         */
        rsp = null;
    }
}

MyServer.java

public class MyServer implements Runnable {
    // CONSTANT
    private final static int BUFFER_SIZE = 8;

    // The host:port combination to listen on
    private InetAddress hostAddress;
    private int port;

    // The channel on which we'll accept connections
    private ServerSocketChannel serverChannel;

    // The selector we'll be monitoring
    private Selector selector;

    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

    private RequestCollector requestCollector;

    // A list of PendingChange instances
    private List<ChangeRequest> pendingChanges = 
            new LinkedList<ChangeRequest>();

    // Maps a SocketChannel to a list of ByteBuffer instances
    private Map<SocketChannel, List<ByteBuffer>> pendingData = 
            new HashMap<SocketChannel, List<ByteBuffer>>();

    public MyServer(InetAddress hostAddress, int port, 
            RequestCollector requestCollector) throws IOException {
        this.hostAddress = hostAddress;
        this.port = port;
        this.selector = this.initSelector();
        this.requestCollector = requestCollector;
    }

    public void send(SocketChannel socket, byte[] data) {
        System.out.println("------------ send() ---- BEGIN");
        synchronized (this.pendingChanges) {
            // Indicate we want the interest ops set changed
            this.pendingChanges.add(new ChangeRequest(socket,
                    ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));

            // And queue the data we want written
            synchronized (this.pendingData) {
                List<ByteBuffer> queue = 
                        (List<ByteBuffer>) this.pendingData.get(socket);
                if (queue == null) {
                    queue = new ArrayList<ByteBuffer>();
                    this.pendingData.put(socket, queue);
                }
                queue.add(ByteBuffer.wrap(data));
            }
        }

        // Finally, wake up our selecting thread so it can make the required 
        // changes
        this.selector.wakeup();
        System.out.println("------------ send() ---- END");
    }

    public void run() {
        while (true) {
            System.out.println("------------ while in run() ---- BEGIN");
            try {
                // Process any pending changes
                synchronized (this.pendingChanges) {
                    Iterator<ChangeRequest> changes = 
                            this.pendingChanges.iterator();
                    while (changes.hasNext()) {
                        System.out.println("CHANGE!!!!!!!!!!!!!!!!!");
                        ChangeRequest change = (ChangeRequest) changes.next();
                        switch (change.type) {
                        case ChangeRequest.CHANGEOPS:
                            SelectionKey key = 
                                change.socket.keyFor(this.selector);
                            key.interestOps(change.ops);
                        }
                    }
                    this.pendingChanges.clear();
                }

                // Wait for an event one of the registered channels
                this.selector.select();
                System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^");

                // Iterate over the set of keys for which events are available
                Iterator<SelectionKey> selectedKeys = 
                        this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    System.out.println("There's something in this while loop");
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();

                    if (!key.isValid()) {
                        System.out.println("key is invalid");
                        continue;
                    }

                    // Check what event is available and deal with it
                    if (key.isAcceptable()) {
                        this.accept(key);
                    } else if (key.isReadable()) {
                        this.read(key);
                    } else if (key.isWritable()) {
                        this.write(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("------------ while in run() ---- END");
        }
    }

    private void accept(SelectionKey key) throws IOException {
        System.out.println("------------ accept() ---- BEGIN");
        // For an accept to be pending the channel must be a server socket 
        // channel.
        ServerSocketChannel serverSocketChannel = 
                (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        //Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        SelectionKey readKey = 
                socketChannel.register(this.selector, SelectionKey.OP_READ);

        // Attach a StringBuilder to this SocketChannel
        readKey.attach( new StringBuilder() );

        // DEBUG    
        System.out.println(socketChannel.socket().getInetAddress() + " - "
                + socketChannel.socket().getPort());
        System.out.println("------------ accept() ---- END");
    }

    private void read(SelectionKey key) throws IOException {
        System.out.println("------------ read() ---- BEGIN");
        // Get socket channel
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Get attached StringBuilder
        StringBuilder currentMessage = (StringBuilder) key.attachment();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            // The remote forcibly closed the connection, cancel
            // the selection key and close the channel.
            key.cancel();
            socketChannel.close();
            return;
        }

        if (numRead == -1) {
            // Remote entity shut the socket down cleanly. Do the
            // same from our end and cancel the channel.
            key.cancel();
            return;
        }

        // Hand the data off to our requestCollector thread
        this.requestCollector.processData(this, socketChannel, 
                this.readBuffer.array(), numRead, currentMessage);
        System.out.println("------------ read() ---- END");
    }

    private void write(SelectionKey key) throws IOException {
        System.out.println("------------ write() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        synchronized (this.pendingData) {
            List<ByteBuffer> queue = 
                    (List<ByteBuffer>) this.pendingData.get(socketChannel);

            // Write until there's not more data ...
            while (!queue.isEmpty()) {
                ByteBuffer buf = (ByteBuffer) queue.get(0);
                socketChannel.write(buf);
                if (buf.remaining() > 0) {
                    // ... or the socket's buffer fills up
                    break;
                }
                queue.remove(0);
            }

            if (queue.isEmpty()) {
                // We wrote away all data, so we're no longer interested
                // in writing on this socket. Switch back to waiting for
                // data.
                key.interestOps(SelectionKey.OP_READ);
            }
        }
        System.out.println("------------ write() ---- END");
    }

    private Selector initSelector() throws IOException {
        System.out.println("------------ initSelector() ---- BEGIN");
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, 
                this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in 
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        System.out.println("------------ initSelector() ---- END");
        return socketSelector;
    }

    public static void main(String[] args) {
        try {
            RequestCollector requestCollector = new RequestCollector();
            new Thread(requestCollector).start();
            new Thread(new MyServer(null, 9090, requestCollector)).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

If you want to see more code, you can download the zip file. And when run server and client, the result of debugging is:

Server

------------ initSelector() ---- BEGIN
------------ initSelector() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ accept() ---- BEGIN
/127.0.0.1 - 46553
------------ accept() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ read() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
RECEIVE: hehe|||
------------ send() ---- BEGIN
------------ send() ---- END
^^^^^^^^^^^^^^^^^^^^^^^^
------------ while in run() ---- END
------------ while in run() ---- BEGIN
SEND: hehe|||
CHANGE!!!!!!!!!!!!!!!!!
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ write() ---- BEGIN
------------ write() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ while in run() ---- END
------------ while in run() ---- BEGIN

Client

------------ initiateConnection() ---- BEGIN
------------ initiateConnection() ---- END
------------ send() ---- BEGIN
------------ send() ---- END
SEND: hehe|||
--waiting...
------------ while in run() ---- BEGIN
CHANGE!!!!!!!!!!!!!!!!!
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ finishConnection() ---- BEGIN
------------ finishConnection() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ write() ---- BEGIN
------------ write() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ handleResponse() ---- BEGIN
--done!!!
RECEIVE: hehe|||
==========================================================
------------ send() ---- BEGIN
------------ send() ---- END
SEND: 2 hehe|||
--waiting...
------------ handleResponse() ---- END
------------ read() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
------------ while in run() ---- END
------------ while in run() ---- BEGIN

Solution

  • I've just commented on most of this code in another recent post. As for your own code, response handlers should not loop and sleep, they should either do a blocking read with a timeout, or else loop calling select() with a timeout.