Search code examples
javaniosocketchannel

SocketChannel issue


I wrote an app which connects to server by TCP with SocketChannel but I have two issues:

  1. The first is minor - sometimes for some unknown reason I send concatenated messages and
  2. the second is crucial - periodically the app stops sends/receives messages

Any idea what is wrong?

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketSelectorWorker extends Thread {
  private static final transient Logger log = LoggerFactory.getLogger(SocketSelectorWorker.class);
  private ExecutorService executorService = Executors.newFixedThreadPool(3);
  private final Queue<byte[]> messages;
  private Selector selector;

  public SocketSelectorWorker(Queue messages, Selector selector) {
    super();
    this.selector = selector;
    this.session = session;
    this.messages = messages;
  }

  @Override
  public void run() {
    super.run();
    while (isConnectionAlive()) {
      try {
        // Wait for an event
        selector.select();
      } catch (IOException e) {
        log.error("Selector error: {}", e.toString());
        log.debug("Stacktrace: ", e);
        session.closeConnection();
        break;
      }
      handleSelectorkeys(selector.selectedKeys());
    }
    executorService.shutdown();
    log.debug("worker stopped");
  }

  private void handleSelectorkeys(Set<SelectionKey> selectedKeys) {
    for (SelectionKey selKey : selector.selectedKeys()) {
      selector.selectedKeys().remove(selKey);
      try {
        processSelectionKey(selKey);
      } catch (IOException e) {
        // Handle error with channel and unregister
        selKey.cancel();
        log.error("Selector error: {}", e.toString());
        log.debug("Stacktrace: ", e);
      }
    }
  }

  public void processSelectionKey(SelectionKey selKey) throws IOException {

    // Since the ready operations are cumulative,
    // need to check readiness for each operation
    if (selKey.isValid() && selKey.isConnectable()) {
      log.debug("connectable");
      // Get channel with connection request
      SocketChannel sChannel = (SocketChannel) selKey.channel();

      boolean success = sChannel.finishConnect();
      if (!success) {
        // An error occurred; handle it
        log.error("Error on finish");
        // Unregister the channel with this selector
        selKey.cancel();
      }
    }

    if (selKey.isValid() && selKey.isReadable()) {
      log.debug("readable");
      readMessage(selKey);
    }

    if (selKey.isValid() && selKey.isWritable()) {
      log.debug("writable");
      writeMessage(selKey);
    }

    if (selKey.isValid() && selKey.isAcceptable()) {
      log.debug("Acceptable");
    }

  }

  private void writeMessage(SelectionKey selKey) throws IOException {
    byte[] message = messages.poll();
    if (message == null) {
      return;
    }

    // Get channel that's ready for more bytes
    SocketChannel socketChannel = (SocketChannel) selKey.channel();

    // See Writing to a SocketChannel
    // Create a direct buffer to get bytes from socket.
    // Direct buffers should be long-lived and be reused as much as
    // possible.

    ByteBuffer buf = ByteBuffer.allocateDirect(1024);// .allocateDirect(toSend.getBytes().length);

    // try {
    // Fill the buffer with the bytes to write;
    // see Putting Bytes into a ByteBuffer
    // buf.put((byte)0xFF);

    buf.clear();
    buf.put(new byte[] { 0x02 });
    buf.put(message);
    buf.put(new byte[] { 0x03 });
    // Prepare the buffer for reading by the socket
    buf.flip();

    // Write bytes
    int numBytesWritten = socketChannel.write(buf);
    log.debug("Written: {}", numBytesWritten);

    while (buf.hasRemaining()) {
      numBytesWritten = socketChannel.write(buf);
      log.debug("Written remining: {}", numBytesWritten);
    }
  }

  private void readMessage(SelectionKey selKey) throws IOException {
    // Get channel with bytes to read
    SocketChannel socketChannel = (SocketChannel) selKey.channel();

    // See Reading from a SocketChannel
    // Create a direct buffer to get bytes from socket.
    // Direct buffers should be long-lived and be reused as much as
    // possible.
    ByteBuffer buf = ByteBuffer.allocateDirect(2048);
    Charset charset = Charset.forName("UTF-8");// Charset.forName("ISO-8859-1");
    CharsetDecoder decoder = charset.newDecoder();

    // try {
    // Clear the buffer and read bytes from socket
    buf.clear();
    int numBytesRead = socketChannel.read(buf);

    if (numBytesRead == -1) {
      // No more bytes can be read from the channel
      // socketChannel.close();
      return;
    }
    log.debug("Read bytes: {}", numBytesRead);
    // To read the bytes, flip the buffer
    buf.flip();

    String result = decoder.decode(buf).toString();

    log.debug("Read string: {}", result);
    //processMessage(result.getBytes());
  }

}

Solution

    1. TCP doesn't have message boundaries. It is a byte-stream protocol. Any message boundaries are up to you.
    2. You aren't processing the selection keys correctly. You must remove via the iterator while iterating, not via the set. That means you can't use the enhanced for-loop. Probably you are skipping keys.

    3. When you get -1 from read() you must close the channel.

    4. When you get an IOException it isn't sufficient to just cancel the key. You should close the channel, which NB cancels they key automatically.