I have a functioning client-server apparatus which can successfully connect and send messages to each other using NIO.
Right now my only confusion is how I'm supposed to continue reading when socketChannel.read() returns zero.
I have a protocol that sends the first 4 bytes as the number of incoming bytes to expect. Even with that amount, I'm running into a potential issue.
For testing, However, there are times where I might read something like:
5 // Read 5 bytes when calling socketChannel.read()
0 // Read 0 bytes when calling socketChannel.read() immediately after
When I hit the zero, I assumed that I was done reading and need to wait for more data to come to me.
When I do this however, OP_READ doesn't seem to be triggered when I perform selectNow() again later on. I checked the key and it has it's readyops() and interestops() set to 1 (which is OP_READ), but it does not want to recognize that it's time to read again.
I found that if I continue looping to read, I might get something like:
5 // socketChannel.read()
0 // socketChannel.read()
7 // socketChannel.read() (Done since I have all my bytes)
0
0
0
...
I'm confused here because this means one of:
There is no data there, so the 0 available is legitimate, but then when the rest of the data comes in, the selector refuses to return the key with selectNow()
The data is all there, but for some reason returns 0 on reading.
Am I supposed to re-register the channel after a selectNow() returns it as an active key? (Though I didn't have to between switching from OP_CONNECT to OP_READ... so I'm guessing not). I feel like blindly circling in a loop is dangerous and will waste processing cycles.
Am I just supposed to keep polling them? That makes me confused at when OP_READ actually fires then.
This was due to an error on my part, where I did not call .clear()
on the bytebuffer that reads. This causes it to return 0 read even though the data has streamed in.
This example may also be of use to people who want to see how a simple client works (though with really bad exception handling). There is no guarantee this will work properly and may likely have issues since it was designed to be a quick and dirty test.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Test {
public static final int PORT = 22222;
public static void main(String[] args) throws IOException {
Thread s = new Thread(new Server());
Thread c = new Thread(new Client());
s.start();
c.start();
}
}
class Client implements Runnable {
public Selector selector;
public SocketChannel sc;
public Client() throws IOException {
selector = Selector.open();
}
@Override
public void run() {
try {
sc = SocketChannel.open();
sc.socket().setTcpNoDelay(true);
sc.configureBlocking(false);
SelectionKey k = sc.register(selector, SelectionKey.OP_CONNECT);
boolean firstConnect = sc.connect(new InetSocketAddress("localhost", Test.PORT));
if (firstConnect) {
System.out.println("Connected on first connect, de-registering OP_CONNECT");
k.interestOps(SelectionKey.OP_READ);
}
while (true) {
int keys = selector.selectNow();
if (keys > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isConnectable()) {
boolean finishConnectResult = sc.finishConnect();
key.interestOps(SelectionKey.OP_READ);
System.out.println("Finished connection: " + finishConnectResult);
}
if (key.isReadable()) {
ByteBuffer bb = ByteBuffer.allocate(2);
int bytesRead = 0;
while ((bytesRead = sc.read(bb)) > 0) {
bb.flip();
System.out.println(bytesRead + " bytes read");
System.out.println(bb.get() + ", " + bb.get());
//bb.clear(); // If this is not commented, it will not be handled properly.
}
System.out.println("Last bytes read value = " + bytesRead);
System.exit(0);
}
}
}
Thread.sleep(5);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException();
}
}
}
class Server implements Runnable {
public Selector selector;
public SocketChannel sc;
public Server() throws IOException {
selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(Test.PORT));
ssc.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
boolean notSentData = true;
try {
while (true) {
int keys = selector.selectNow();
if (keys > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
sc = ssc.accept();
if (sc != null) {
sc.configureBlocking(false);
sc.socket().setTcpNoDelay(true); // Required in my application
sc.register(selector, SelectionKey.OP_WRITE);
System.out.println("Server accepted connection");
} else {
System.out.println("Got null connection");
}
}
}
}
if (sc != null && notSentData) {
ByteBuffer bb = ByteBuffer.allocate(4);
bb.put(new byte[]{ 1, 2, 3, -1});
bb.flip();
int wrote = sc.write(bb);
System.out.println("Wrote " + wrote + " bytes");
notSentData = false;
}
Thread.sleep(5);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException();
}
}
}