Search code examples
javanionio2completionhandler

Java NIO2 concurrent completion handlers


I am writing NIO2 server, and I need to make asynchronous read operations on an AsynchronousSocketChannel, each of these operations consists of reading an integer, and further reading from the same channel number of bytes equal to this integer. Problem is, when I put two or more CompletionHandler's on channel in a row (cause there are requests for multiple reading operations), and first of these handlers gets fired, my further reading code in complete() method of the first handler cant work properly, because second handler gets fired instantly when there's info on channel. How can I block channel 'til further reading complete() completes without Future thing? I cant use Future cause I need to put handler to the socket and then pass to the other tasks.

for (final Map.Entry<String, AsynchronousSocketChannel> entry : ipSocketTable.entrySet()) {
        try {
            final AsynchronousSocketChannel outSocket = entry.getValue();
            synchronized (outSocket) {
                final ByteBuffer buf = ByteBuffer.allocateDirect(9);
                outSocket.read(buf, null, new DataServerResponseHandler(buf, outSocket, resultTable, server, entry.getKey()));
            }

        } catch (Exception e) {

        }
    }

Here is DataServerResponseHandler class:

class DataServerResponseHandler implements CompletionHandler<Integer, Void> {

    private ConcurrentHashMap<String, Boolean> resultTable = null;
    private AsynchronousSocketChannel channel = null;
    private TcpServer server;
    private String ip;
    private ByteBuffer msg;

    public DataServerResponseHandler(ByteBuffer msg, AsynchronousSocketChannel channel,
            ConcurrentHashMap<String, Boolean> resultTable, TcpServer server, String ip) {
        this.msg = msg;
        this.channel = channel;
        this.resultTable = resultTable;
        this.server = server;
        this.ip = ip;
    }

    @Override
    public void completed(Integer result, Void attachment) {
            try {
                msg.rewind();
                int resultCode = msg.get() & 0xff;
                int ipOne = msg.get() & 0xff;
                int ipTwo = msg.get() & 0xff;
                int ipThree = msg.get() & 0xff;
                int ipFour = msg.get() & 0xff;
                int length = msg.getInt();
                msg.rewind();
                ByteBuffer buf = ByteBuffer.allocateDirect(length);
                channel.read(buf).get();
                buf.rewind();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    @Override
    public void failed(Throwable exc, Void attachment) {
        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }

}

Solution

  • This code has several problems.
    First read doesn't guarantee that it will read all remaining bytes but calls completion handler as soon as it reads at least one byte. Therefore you have to check position of buffer and re-invoke the read until you have 9 bytes for header or length for the payload.

    if (msg.position() < 9) {
        channel.read(msg, null, this);
        return;
    }
    

    For the second part in order to continue asynchronous approach again run the read with completion handler. You can create new one which would specifically handle the payload or reuse the existing one and you'll have to remember the state:

    switch (state) {
    case READ_HEADER:
        if (msg.remaining() > 0) {
            channel.read(msg, null, this);
            return;
        }
        // do the parsing the IP and length
        state = READ_PAYLOAD;
        channel.read(payloadBuf, null, this);
        break;
    
    case READ_PAYLOAD:
        if (payloadBuf.remaining() > 0) {
            channel.read(payloadBuf, null, this);
            return;
        }
        payloadBuf.flip();
        // get content from payloadBuf
        break;
    }