Search code examples
javamultithreadingnionio2

What is the multi threading model in Java NIO 2 - (Proactor pattern)?


I am trying to build a simple Echo Service using Java NIO 2 (which is based on Proactor Pattern).

In the simplest implementation, we have 4 main components; ProactorInitiator, AcceptConnectionHandler, ReadConnectionHandler and WriteConnectionHandler.

Following is my sample code.

ProactorInitiator.java

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;

public class ProactorInitiator {
    static int ASYNC_SERVER_PORT = 4333;

    public void initiateProactiveServer(int port)
            throws IOException {

        final AsynchronousServerSocketChannel listener =
                AsynchronousServerSocketChannel.open().bind(
                        new InetSocketAddress(port));
        AcceptCompletionHandler acceptCompletionHandler =
                new AcceptCompletionHandler(listener);

        SessionState state = new SessionState();
        listener.accept(state, acceptCompletionHandler);
        System.out.println("Proactor Initiator Running on "+Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        try {
            System.out.println("Async server listening on port : " +
                    ASYNC_SERVER_PORT);
            new ProactorInitiator().initiateProactiveServer(
                    ASYNC_SERVER_PORT);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // Sleep indefinitely since otherwise the JVM would terminate
        while (true) {
            try {
                Thread.sleep(Long.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

AcceptCompletionHandler.java

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AcceptCompletionHandler
        implements
        CompletionHandler<AsynchronousSocketChannel, SessionState> {

    private AsynchronousServerSocketChannel listener;

    public AcceptCompletionHandler(
            AsynchronousServerSocketChannel listener) {
        this.listener = listener;
    }

    @Override
    public void completed(AsynchronousSocketChannel socketChannel,
                          SessionState sessionState) {

        System.out.println("Accept Handler running on "+Thread.currentThread().getName());
        // accept the next connection
        SessionState newSessionState = new SessionState();
        listener.accept(newSessionState, this);

        // handle this connection
        ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
        ReadCompletionHandler readCompletionHandler =
                new ReadCompletionHandler(socketChannel, inputBuffer);
        socketChannel.read(
                inputBuffer, sessionState, readCompletionHandler);
    }

    @Override
    public void failed(Throwable exc, SessionState sessionState) {
        // Handle connection failure...
    }

}

ReadCompletionHandler.java

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class ReadCompletionHandler implements
        CompletionHandler<Integer, SessionState> {

    private AsynchronousSocketChannel socketChannel;
    private ByteBuffer inputBuffer;

    public ReadCompletionHandler(
            AsynchronousSocketChannel socketChannel,
            ByteBuffer inputBuffer) {
        this.socketChannel = socketChannel;
        this.inputBuffer = inputBuffer;
    }

    @Override
    public void completed(
            Integer bytesRead, SessionState sessionState) {

        System.out.println("Read Handler running on "+Thread.currentThread().getName());

        byte[] buffer = new byte[bytesRead];
        inputBuffer.rewind();
        // Rewind the input buffer to read from the beginning

        inputBuffer.get(buffer);
        String message = new String(buffer);

//        System.out.println("Received message from client : " + message);

//        message = GetRequestParser.getHTTPRequest(message, "200 OK");

        // Echo the message back to client
        WriteCompletionHandler writeCompletionHandler =
                new WriteCompletionHandler(socketChannel);

        ByteBuffer outputBuffer = ByteBuffer.wrap(message.getBytes());

        socketChannel.write(
                outputBuffer, sessionState, writeCompletionHandler);
    }

    @Override
    public void failed(Throwable exc, SessionState attachment) {
        //Handle read failure.....
    }

}

WriteCompletionHandler.java

import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class WriteCompletionHandler implements
        CompletionHandler<Integer, SessionState> {

    private AsynchronousSocketChannel socketChannel;

    public WriteCompletionHandler(
            AsynchronousSocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void completed(
            Integer bytesWritten, SessionState attachment) {
        try {
            System.out.println("Write Handler running on "+Thread.currentThread().getName());
            System.out.println("\n");
            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void failed(Throwable exc, SessionState attachment) {
        // Handle write failure.....
    }

}

SessionState.java

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SessionState {

    private Map<String, String> sessionProps =
            new ConcurrentHashMap<>();

    public String getProperty(String key) {
        return sessionProps.get(key);
    }

    public void setProperty(String key, String value) {
        sessionProps.put(key, value);
    }

}

In order to check the threading behaviour, I print the thread on which each handler runs to 'sys.out'.

Following are the different results I got, for many requests which are sent to the server one after the other.

Request 1

Accept Handler running on Thread-4
Read Handler running on Thread-4
Write Handler running on Thread-4

Request 2

Accept Handler running on Thread-4
Read Handler running on Thread-2
Write Handler running on Thread-2

Request 3

Accept Handler running on Thread-5
Read Handler running on Thread-3
Write Handler running on Thread-3

According to the above results, it seems like, for different requests, the server uses different threads. Also, both Read Handler and Write Handler are run on the same thread for a given request.

Can someone explain this result? As how handlers are scheduled on different threads?


Solution

  • As seen in your results for Thread.getCurrentThread().getName() for each Completion handler, in NIO2 (proactor pattern) the thread allocation for different Completion handlers is not specified and seems random. So, the best practice is not to assume any thread behaviour.

    For the sake of completeness, I am adding the thread behaviour of NIO, in the following.

    In NIO, each activity (be it socket accept, read or write) is run in a single thread (in which the selector loop runs.)