Search code examples
javasocketstcpniosocketchannel

NIO SocketChannel saying there is no data when there is (or selector is not informing me)


I have a functioning client-server apparatus which can successfully connect and send messages to each other using NIO.

Right now my only confusion is how I'm supposed to continue reading when socketChannel.read() returns zero.

I have a protocol that sends the first 4 bytes as the number of incoming bytes to expect. Even with that amount, I'm running into a potential issue.

For testing, However, there are times where I might read something like:

5 // Read 5 bytes when calling socketChannel.read()
0 // Read 0 bytes when calling socketChannel.read() immediately after

When I hit the zero, I assumed that I was done reading and need to wait for more data to come to me.

When I do this however, OP_READ doesn't seem to be triggered when I perform selectNow() again later on. I checked the key and it has it's readyops() and interestops() set to 1 (which is OP_READ), but it does not want to recognize that it's time to read again.

I found that if I continue looping to read, I might get something like:

5 // socketChannel.read()
0 // socketChannel.read()
7 // socketChannel.read() (Done since I have all my bytes)
0
0
0
...

I'm confused here because this means one of:

  • There is no data there, so the 0 available is legitimate, but then when the rest of the data comes in, the selector refuses to return the key with selectNow()

  • The data is all there, but for some reason returns 0 on reading.

Am I supposed to re-register the channel after a selectNow() returns it as an active key? (Though I didn't have to between switching from OP_CONNECT to OP_READ... so I'm guessing not). I feel like blindly circling in a loop is dangerous and will waste processing cycles.

Am I just supposed to keep polling them? That makes me confused at when OP_READ actually fires then.


Solution

  • This was due to an error on my part, where I did not call .clear() on the bytebuffer that reads. This causes it to return 0 read even though the data has streamed in.

    This example may also be of use to people who want to see how a simple client works (though with really bad exception handling). There is no guarantee this will work properly and may likely have issues since it was designed to be a quick and dirty test.

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    
    public class Test {
    
        public static final int PORT = 22222;
    
        public static void main(String[] args) throws IOException {
            Thread s = new Thread(new Server());
            Thread c = new Thread(new Client());
            s.start();
            c.start();
        }
    }
    
    class Client implements Runnable {
    
        public Selector selector;
    
        public SocketChannel sc;
    
        public Client() throws IOException {
            selector = Selector.open();
        }
    
        @Override
        public void run() {
            try {
                sc = SocketChannel.open();
                sc.socket().setTcpNoDelay(true);
                sc.configureBlocking(false);
                SelectionKey k = sc.register(selector, SelectionKey.OP_CONNECT);
                boolean firstConnect = sc.connect(new InetSocketAddress("localhost", Test.PORT));
                if (firstConnect) {
                    System.out.println("Connected on first connect, de-registering OP_CONNECT");
                    k.interestOps(SelectionKey.OP_READ);
                }
    
                while (true) {
                    int keys = selector.selectNow();
                    if (keys > 0) {
                        for (SelectionKey key : selector.selectedKeys()) {
                            if (key.isConnectable()) {
                                boolean finishConnectResult = sc.finishConnect();
                                key.interestOps(SelectionKey.OP_READ);
                                System.out.println("Finished connection: " + finishConnectResult);
                            }
    
                            if (key.isReadable()) {
                                ByteBuffer bb = ByteBuffer.allocate(2);
                                int bytesRead = 0;
                                while ((bytesRead = sc.read(bb)) > 0) {
                                    bb.flip();
                                    System.out.println(bytesRead + " bytes read");
                                    System.out.println(bb.get() + ", " + bb.get());
                                    //bb.clear(); // If this is not commented, it will not be handled properly.
                                }
                                System.out.println("Last bytes read value = " + bytesRead);
                                System.exit(0);
                            }
                        }
                    }
    
                    Thread.sleep(5);
                }
            } catch (Exception e) { 
                e.printStackTrace();
                throw new RuntimeException();
            }
        }
    }
    
    class Server implements Runnable {
    
        public Selector selector;
    
        public SocketChannel sc;
    
        public Server() throws IOException {
            selector = Selector.open();
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.bind(new InetSocketAddress(Test.PORT));
            ssc.register(selector, SelectionKey.OP_ACCEPT);
        }
    
        @Override
        public void run() {
            boolean notSentData = true;
            try {
                while (true) {
                    int keys = selector.selectNow();
                    if (keys > 0) {
                        for (SelectionKey key : selector.selectedKeys()) {
                            if (key.isAcceptable()) {
                                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                                sc = ssc.accept();
                                if (sc != null) {
                                    sc.configureBlocking(false);
                                    sc.socket().setTcpNoDelay(true); // Required in my application
                                    sc.register(selector, SelectionKey.OP_WRITE);
                                    System.out.println("Server accepted connection");
                                } else {
                                    System.out.println("Got null connection");
                                }
                            }
                        }
                    }
    
                    if (sc != null && notSentData) {
                        ByteBuffer bb = ByteBuffer.allocate(4);
                        bb.put(new byte[]{ 1, 2, 3, -1});
                        bb.flip();
                        int wrote = sc.write(bb);
                        System.out.println("Wrote " + wrote + " bytes");
                        notSentData = false;
                    }
    
                    Thread.sleep(5);
                }
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException();
            }
        }
    }