I'm developing for a project a NIO server that takes as input a message from the client containing the times of running for the read and write operations. I have a problem because at the first execution of the client everything works fine, but if I run the client once again the server gets stuck in the writable part. Can you tell me what am I doing wrong? These are my files, thank you in advance.
MyAsyncProcessor.java
public class MyAsyncProcessor {
enum States {
Idle,
Read,
Write
}
ExecutorService pool;
private Map<Integer, States> socketStates = new HashMap<>();
public MyAsyncProcessor() {
}
public static void main(String[] args) throws IOException {
new MyAsyncProcessor().process();
}
public void process() throws IOException {
pool = Executors.newFixedThreadPool(2);
InetAddress host = InetAddress.getByName("localhost");
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(host, 9876));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key;
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> i = selectedKeys.iterator();
while (i.hasNext()) {
key = i.next();
i.remove();
MyTask task = new MyTask();
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("Channel hashCode: " + socketChannel.hashCode());
socketChannel.register(selector, SelectionKey.OP_READ + SelectionKey.OP_WRITE);
socketStates.put(socketChannel.hashCode(), States.Idle);
System.out.println("Connection accepted from: " + socketChannel.getLocalAddress());
}
if (key.isReadable()) {
System.out.println("Readable");
SocketChannel socketChannel = (SocketChannel) key.channel();
States socketState = socketStates.get(socketChannel.hashCode());
if (socketState == States.Idle) {
socketStates.put(socketChannel.hashCode(), States.Read);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
socketChannel.read(byteBuffer);
String result = new String(byteBuffer.array()).trim();
String[] words = result.split(" ");
int secondsToRead = Integer.parseInt(words[words.length - 2])*1000;
int secondsToWrite = Integer.parseInt(words[words.length - 1])*1000;
task.setTimeToRead(secondsToRead);
task.setTimeToWrite(secondsToWrite);
System.out.println(task.getTimeToRead() + " " + task.getTimeToWrite());
Runnable h = new MyAsyncReadThread(task);
pool.execute(h);
socketChannel.register(selector, SelectionKey.OP_WRITE);
} catch (Exception e) {
System.out.println("Closing Connection Read...");
}
}
}
if (key.isWritable()) {
System.out.println("Writable");
SocketChannel socketChannel = (SocketChannel) key.channel();
States socketState = socketStates.get(socketChannel.hashCode());
if (socketState == States.Read) {
socketStates.put(socketChannel.hashCode(), States.Write);
System.out.println(task.getTimeToRead() + " " + task.getTimeToWrite());
Runnable h = new MyAsyncWriteThread(task);
pool.execute(h);
}
key.cancel();
}
}
}
}
}
}
MyClient.java
public class MyClient {
public static void main(String [] args) {
Random rand = new Random();
int secondsToRead = rand.nextInt(10);
int secondsToWrite = secondsToRead + 1;
String message = "Seconds for the task to be read and written: " + secondsToRead + " " + secondsToWrite;
System.out.println(message);
Socket socket;
try {
socket = new Socket("127.0.0.1", 9876);
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
printWriter.println(message);
System.out.println("Sending message");
} catch (IOException e) {
System.out.println("Error in Socket");
System.exit(-1);
}
}
}
sorry , I can't comment.
you can't use key.cancel();
, I don't know your business, only I can advice just don't use Map like that.
JDK.NIO
is very hard. here is your code(change a bit) , hoping work for you.
Don't write NIO code buy yourself. [https://netty.io/][Netty] is good.
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author [email protected]
* @since 2022/12/13 09:46
*/
public class MyAsyncProcessor {
enum States {
Idle,
Read,
Write
}
ExecutorService pool;
private Map<Integer, States> socketStates = new HashMap<>();
public MyAsyncProcessor() {
}
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println("execute task");
}
private int secondsToRead;
private int secondsToWrite;
public void setTimeToRead(int secondsToRead) {
this.secondsToRead = secondsToRead;
}
public void setTimeToWrite(int secondsToWrite) {
this.secondsToWrite = secondsToWrite;
}
}
public static void main(String[] args) throws IOException {
new MyAsyncProcessor().process();
}
public void process() throws IOException {
pool = Executors.newFixedThreadPool(2);
InetAddress host = InetAddress.getByName("localhost");
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(host, 9876));
final SelectionKey register1 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
register1.attach(serverSocketChannel);
SelectionKey key;
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> i = selectedKeys.iterator();
while (i.hasNext()) {
key = i.next();
i.remove();
MyTask task = new MyTask();
if (!key.isValid()) {
key.cancel();
continue;
}
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("Channel hashCode: " + socketChannel.hashCode());
final SelectionKey register = socketChannel.register(selector, SelectionKey.OP_READ);
register.attach(key.attachment());
System.out.println("Connection accepted from: " + socketChannel.getLocalAddress());
}
if (key.isReadable()) {
System.out.println("Readable");
SocketChannel socketChannel = (SocketChannel) key.channel();
socketStates.put(socketChannel.hashCode(), States.Read);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
final int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("receive message form client:" + new String(byteBuffer.array(), 0, read - 1));
task.setTimeToRead(10);
task.setTimeToWrite(10);
pool.execute(task);
}
socketChannel.register(selector, SelectionKey.OP_WRITE);
} catch (Exception e) {
socketChannel.close();
}
}
if (key.isValid() && key.isWritable()) {
System.out.println("Writable");
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
socketChannel.write(ByteBuffer.wrap("hello world!".getBytes(StandardCharsets.UTF_8)));
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
socketChannel.close();
}
}
}
}
}
}
}