I wrote an app which connects to server by TCP with SocketChannel but I have two issues:
Any idea what is wrong?
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketSelectorWorker extends Thread {
private static final transient Logger log = LoggerFactory.getLogger(SocketSelectorWorker.class);
private ExecutorService executorService = Executors.newFixedThreadPool(3);
private final Queue<byte[]> messages;
private Selector selector;
public SocketSelectorWorker(Queue messages, Selector selector) {
super();
this.selector = selector;
this.session = session;
this.messages = messages;
}
@Override
public void run() {
super.run();
while (isConnectionAlive()) {
try {
// Wait for an event
selector.select();
} catch (IOException e) {
log.error("Selector error: {}", e.toString());
log.debug("Stacktrace: ", e);
session.closeConnection();
break;
}
handleSelectorkeys(selector.selectedKeys());
}
executorService.shutdown();
log.debug("worker stopped");
}
private void handleSelectorkeys(Set<SelectionKey> selectedKeys) {
for (SelectionKey selKey : selector.selectedKeys()) {
selector.selectedKeys().remove(selKey);
try {
processSelectionKey(selKey);
} catch (IOException e) {
// Handle error with channel and unregister
selKey.cancel();
log.error("Selector error: {}", e.toString());
log.debug("Stacktrace: ", e);
}
}
}
public void processSelectionKey(SelectionKey selKey) throws IOException {
// Since the ready operations are cumulative,
// need to check readiness for each operation
if (selKey.isValid() && selKey.isConnectable()) {
log.debug("connectable");
// Get channel with connection request
SocketChannel sChannel = (SocketChannel) selKey.channel();
boolean success = sChannel.finishConnect();
if (!success) {
// An error occurred; handle it
log.error("Error on finish");
// Unregister the channel with this selector
selKey.cancel();
}
}
if (selKey.isValid() && selKey.isReadable()) {
log.debug("readable");
readMessage(selKey);
}
if (selKey.isValid() && selKey.isWritable()) {
log.debug("writable");
writeMessage(selKey);
}
if (selKey.isValid() && selKey.isAcceptable()) {
log.debug("Acceptable");
}
}
private void writeMessage(SelectionKey selKey) throws IOException {
byte[] message = messages.poll();
if (message == null) {
return;
}
// Get channel that's ready for more bytes
SocketChannel socketChannel = (SocketChannel) selKey.channel();
// See Writing to a SocketChannel
// Create a direct buffer to get bytes from socket.
// Direct buffers should be long-lived and be reused as much as
// possible.
ByteBuffer buf = ByteBuffer.allocateDirect(1024);// .allocateDirect(toSend.getBytes().length);
// try {
// Fill the buffer with the bytes to write;
// see Putting Bytes into a ByteBuffer
// buf.put((byte)0xFF);
buf.clear();
buf.put(new byte[] { 0x02 });
buf.put(message);
buf.put(new byte[] { 0x03 });
// Prepare the buffer for reading by the socket
buf.flip();
// Write bytes
int numBytesWritten = socketChannel.write(buf);
log.debug("Written: {}", numBytesWritten);
while (buf.hasRemaining()) {
numBytesWritten = socketChannel.write(buf);
log.debug("Written remining: {}", numBytesWritten);
}
}
private void readMessage(SelectionKey selKey) throws IOException {
// Get channel with bytes to read
SocketChannel socketChannel = (SocketChannel) selKey.channel();
// See Reading from a SocketChannel
// Create a direct buffer to get bytes from socket.
// Direct buffers should be long-lived and be reused as much as
// possible.
ByteBuffer buf = ByteBuffer.allocateDirect(2048);
Charset charset = Charset.forName("UTF-8");// Charset.forName("ISO-8859-1");
CharsetDecoder decoder = charset.newDecoder();
// try {
// Clear the buffer and read bytes from socket
buf.clear();
int numBytesRead = socketChannel.read(buf);
if (numBytesRead == -1) {
// No more bytes can be read from the channel
// socketChannel.close();
return;
}
log.debug("Read bytes: {}", numBytesRead);
// To read the bytes, flip the buffer
buf.flip();
String result = decoder.decode(buf).toString();
log.debug("Read string: {}", result);
//processMessage(result.getBytes());
}
}
You aren't processing the selection keys correctly. You must remove via the iterator while iterating, not via the set. That means you can't use the enhanced for-loop. Probably you are skipping keys.
When you get -1 from read()
you must close the channel.
When you get an IOException
it isn't sufficient to just cancel the key. You should close the channel, which NB cancels they key automatically.