Search code examples
javamultithreadingniononblocking

Java NIO non-blocking read and write operations


I'm developing for a project a NIO server that takes as input a message from the client containing the times of running for the read and write operations. I have a problem because at the first execution of the client everything works fine, but if I run the client once again the server gets stuck in the writable part. Can you tell me what am I doing wrong? These are my files, thank you in advance.

MyAsyncProcessor.java

public class MyAsyncProcessor {

    enum States {
        Idle,
        Read,
        Write
    }

    ExecutorService pool;
    private Map<Integer, States> socketStates = new HashMap<>();

    public MyAsyncProcessor() {
    }

    public static void main(String[] args) throws IOException {
        new MyAsyncProcessor().process();
    }

    public void process() throws IOException {
        pool = Executors.newFixedThreadPool(2);
        InetAddress host = InetAddress.getByName("localhost");
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(host, 9876));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        SelectionKey key;
        while (true) {
            if (selector.select() > 0) {
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> i = selectedKeys.iterator();
                while (i.hasNext()) {
                    key = i.next();
                    i.remove();
                    MyTask task = new MyTask();
                    if (key.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        System.out.println("Channel hashCode: " + socketChannel.hashCode());
                        socketChannel.register(selector, SelectionKey.OP_READ + SelectionKey.OP_WRITE);
                        socketStates.put(socketChannel.hashCode(), States.Idle);
                        System.out.println("Connection accepted from: " + socketChannel.getLocalAddress());
                    }
                    if (key.isReadable()) {
                        System.out.println("Readable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        States socketState = socketStates.get(socketChannel.hashCode());
                        if (socketState == States.Idle) {
                            socketStates.put(socketChannel.hashCode(), States.Read);
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            try {
                                socketChannel.read(byteBuffer);
                                String result = new String(byteBuffer.array()).trim();
                                String[] words = result.split(" ");
                                int secondsToRead = Integer.parseInt(words[words.length - 2])*1000;
                                int secondsToWrite = Integer.parseInt(words[words.length - 1])*1000;
                                task.setTimeToRead(secondsToRead);
                                task.setTimeToWrite(secondsToWrite);
                                System.out.println(task.getTimeToRead() + " " + task.getTimeToWrite());
                                Runnable h = new MyAsyncReadThread(task);
                                pool.execute(h);
                                socketChannel.register(selector, SelectionKey.OP_WRITE);
                            } catch (Exception e) {
                                System.out.println("Closing Connection Read...");
                            }
                        }
                    }
                    if (key.isWritable()) {
                        System.out.println("Writable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        States socketState = socketStates.get(socketChannel.hashCode());
                        if (socketState == States.Read) {
                            socketStates.put(socketChannel.hashCode(), States.Write);
                            System.out.println(task.getTimeToRead() + " " + task.getTimeToWrite());
                            Runnable h = new MyAsyncWriteThread(task);
                            pool.execute(h);
                        }
                        key.cancel();
                    }
                }
            }
        }
    }
}

MyClient.java

public class MyClient  {

    public static void main(String [] args) {

        Random rand = new Random();
        int secondsToRead = rand.nextInt(10);
        int secondsToWrite = secondsToRead + 1;
        String message = "Seconds for the task to be read and written: " + secondsToRead + " " + secondsToWrite;
        System.out.println(message);
        Socket socket;
        try {
            socket = new Socket("127.0.0.1", 9876);
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            printWriter.println(message);
            System.out.println("Sending message");
        } catch (IOException e) {
            System.out.println("Error in Socket");
            System.exit(-1);
        }
    }
}





Solution

  • sorry , I can't comment.

    you can't use key.cancel();, I don't know your business, only I can advice just don't use Map like that.

    JDK.NIO is very hard. here is your code(change a bit) , hoping work for you.

    Don't write NIO code buy yourself. [https://netty.io/][Netty] is good.

    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author [email protected]
     * @since 2022/12/13 09:46
     */
    public class MyAsyncProcessor {
    
        enum States {
            Idle,
            Read,
            Write
        }
    
        ExecutorService pool;
        private Map<Integer, States> socketStates = new HashMap<>();
    
        public MyAsyncProcessor() {
        }
    
        public static class MyTask implements Runnable {
    
            @Override
            public void run() {
                System.out.println("execute task");
            }
    
            private int secondsToRead;
            private int secondsToWrite;
    
            public void setTimeToRead(int secondsToRead) {
                this.secondsToRead = secondsToRead;
            }
    
            public void setTimeToWrite(int secondsToWrite) {
                this.secondsToWrite = secondsToWrite;
            }
        }
    
        public static void main(String[] args) throws IOException {
            new MyAsyncProcessor().process();
        }
    
        public void process() throws IOException {
            pool = Executors.newFixedThreadPool(2);
            InetAddress host = InetAddress.getByName("localhost");
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(host, 9876));
            final SelectionKey register1 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            register1.attach(serverSocketChannel);
            SelectionKey key;
            while (true) {
                if (selector.select() > 0) {
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> i = selectedKeys.iterator();
                    while (i.hasNext()) {
                        key = i.next();
                        i.remove();
                        MyTask task = new MyTask();
                        if (!key.isValid()) {
                            key.cancel();
                            continue;
                        }
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            socketChannel.configureBlocking(false);
                            System.out.println("Channel hashCode: " + socketChannel.hashCode());
                            final SelectionKey register = socketChannel.register(selector, SelectionKey.OP_READ);
                            register.attach(key.attachment());
                            System.out.println("Connection accepted from: " + socketChannel.getLocalAddress());
                        }
                        if (key.isReadable()) {
                            System.out.println("Readable");
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            socketStates.put(socketChannel.hashCode(), States.Read);
    
    
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            try {
                                final int read = socketChannel.read(byteBuffer);
                                if (read > 0) {
                                    System.out.println("receive message form client:" + new String(byteBuffer.array(), 0, read - 1));
                                    task.setTimeToRead(10);
                                    task.setTimeToWrite(10);
                                    pool.execute(task);
                                }
                                socketChannel.register(selector, SelectionKey.OP_WRITE);
                            } catch (Exception e) {
                                socketChannel.close();
                            }
                        }
                        if (key.isValid() && key.isWritable()) {
                            System.out.println("Writable");
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            try {
                                socketChannel.write(ByteBuffer.wrap("hello world!".getBytes(StandardCharsets.UTF_8)));
                                socketChannel.register(selector, SelectionKey.OP_READ);
                            } catch (IOException e) {
                                socketChannel.close();
                            }
                        }
                    }
                }
            }
        }
    }