Search code examples
javanio

Java NIO Framework stops working under heavy load with no write


The problem is fairly odd to me although I'm a newbie.

What's going on is that if you force the server under heavy load of connections and keep sending an invalid packet that doesn't represent POLICY_XML packet.

Pretty much what I'm trying to say is that if you connect it goes into socket READ OPERATION. Then you never go into send() which changes SelectionKey into WRITE OPERATION. Somehow the read operations stack up and after 2000 or so connection requests the server will stop accepting connections, no matter what. I've tried to connect with telnet and always fail to make a connection.. But after around 5 minutes it starts accepting connections again and becomes fully functional.

Very strange problem but note that if I remove the packet matching statement it will act similarly to an echo server. Then it will run endlessly without experiencing any connection accepting issues, pretty much turns stable.

I've attached the whole server source code below. Can someone who has extensive knowledge with NIO please check it out and let me know if there is a way to fix it.

All that really catches my eye is the selector wakeup in send() which may fix everything after putting the line below into the read() it seems to do absolutely nothing and the problem remains.

// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();

Here is the source for the simple server.

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

public class PolicyServer implements Runnable {
    public static final String POLICY_REQUEST = "<policy-file-request/>";
    public static final String POLICY_XML =
        "<?xml version=\"1.0\"?>"
        + "<cross-domain-policy>"
        + "<allow-access-from domain=\"*\" to-ports=\"*\" />"
        + "</cross-domain-policy>"
        + (char)0;


    // The host:port combination to listen on
    private InetAddress hostAddress;
    private int port;

    // The channel on which we'll accept connections
    private ServerSocketChannel serverChannel;

    // The selector we'll be monitoring
    private Selector selector;

    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(255);

    // This decodes raw bytes into ascii data.
    private CharsetDecoder asciiDecoder;

    // A list of PendingChange instances
    private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();

    // Maps a SocketChannel to a list of ByteBuffer instances
    private Map<SocketChannel, List<ByteBuffer>> pendingData = new HashMap<SocketChannel, List<ByteBuffer>>();

    public PolicyServer(InetAddress hostAddress, int port) throws IOException {
        this.hostAddress = hostAddress;
        this.port = port;
        this.selector = this.initSelector();
        this.asciiDecoder = Charset.forName("US-ASCII").newDecoder().onMalformedInput(
                                CodingErrorAction.REPLACE).onUnmappableCharacter(
                                CodingErrorAction.REPLACE);
    }

    public void send(SocketChannel socket, byte[] data) {
        synchronized (this.pendingChanges) {
            // Indicate we want the interest ops set changed
            this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));

            // And queue the data we want written
            synchronized (this.pendingData) {
                List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
                if (queue == null) {
                    queue = new ArrayList<ByteBuffer>();
                    this.pendingData.put(socket, queue);
                }
                queue.add(ByteBuffer.wrap(data));
            }
        }

        // Finally, wake up our selecting thread so it can make the required changes
        this.selector.wakeup();
    }

    public void run() {
        while (true) {
            try {
                // Process any pending changes
                synchronized (this.pendingChanges) {
                    Iterator changes = this.pendingChanges.iterator();
                    while (changes.hasNext()) {
                        ChangeRequest change = (ChangeRequest) changes.next();
                        changes.remove();
                        if(change == null) continue;
                        switch (change.type) {
                        case ChangeRequest.CHANGEOPS:
                            SelectionKey key = change.socket.keyFor(this.selector);
                            try {
                                if(key!=null)
                                    key.interestOps(change.ops);
                            } catch(Exception ex) {
                                if (key!=null)
                                    key.cancel();
                            }
                        }
                    }
                    this.pendingChanges.clear();
                }

                // Wait for an event one of the registered channels
                this.selector.select();

                // Iterate over the set of keys for which events are available
                Iterator selectedKeys = this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();

                    if (!key.isValid()) {
                        key.cancel();
                        continue;
                    }

                    // Check what event is available and deal with it
                    try {
                        if (key.isAcceptable()) {
                            this.accept(key);
                        } else if (key.isReadable()) {
                            this.read(key);
                        } else if (key.isWritable()) {
                            this.write(key);
                        }
                    } catch(IOException io) {
                        this.pendingData.remove(key.channel());
                        try {
                            ((SocketChannel)key.channel()).socket().close();
                        } catch (IOException e) {}
                        key.channel().close();
                        key.cancel();
                        key.attach(null);
                        key = null;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        // For an accept to be pending the channel must be a server socket channel.
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        // also contains a attachment of a new StringBuffer (for storing imcomplete/multi packets)
        socketChannel.register(this.selector, SelectionKey.OP_READ, new StringBuffer());
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead = socketChannel.read(this.readBuffer);

        if (numRead == -1) {
            // Remote entity shut the socket down cleanly. Do the
            // same from our end and cancel the channel.
            throw new IOException("");
        }

        // Grab the StringBuffer we stored as the attachment
        StringBuffer sb = (StringBuffer)key.attachment();

        // Flips the readBuffer by setting the current position of
        // packet stream to beginning.
        // Append the data to the attachment StringBuffer
        this.readBuffer.flip();
        sb.append(this.asciiDecoder.decode(this.readBuffer).toString());
        this.readBuffer.clear();

        // Get the policy request as complete packet
        if(sb.indexOf("\0") != -1) {
            String packets = new String(sb.substring(0, sb.lastIndexOf("\0")+1));
            sb.delete(0, sb.lastIndexOf("\0")+1);
            if(packets.indexOf(POLICY_REQUEST) != -1)
                send(socketChannel, POLICY_XML.getBytes());
        } else if(sb.length() >  8192) {
            sb.setLength(0);
            //Force disconnect.
            throw new IOException("");
        }
    }

    private void write(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        synchronized (this.pendingData) {
            List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);

            if(queue == null || queue.isEmpty()) {
                // We wrote away all data, so we're no longer interested
                // in writing on this socket. Switch back to waiting for
                // data.
                try {
                    if (key!=null)
                        key.interestOps(SelectionKey.OP_READ);
                } catch(Exception ex) {
                    if (key!=null)
                        key.cancel();
                }
            }

            // Write until there's not more data ...
            while (!queue.isEmpty()) {
                ByteBuffer buf = (ByteBuffer) queue.get(0);
                socketChannel.write(buf);
                if (buf.remaining() > 0) {
                    // ... or the socket's buffer fills up
                    break;
                }
                queue.remove(0);
            }
        }
    }

    private Selector initSelector() throws IOException {
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        return socketSelector;
    }

    public static void main(String[] args) {
        try {
            new Thread(new PolicyServer(null, 5556)).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


import java.nio.channels.SocketChannel;

public class ChangeRequest {
    public static final int CHANGEOPS = 1;

    public SocketChannel socket;
    public int type;
    public int ops;

    public ChangeRequest(SocketChannel socket, int type, int ops) {
        this.socket = socket;
        this.type = type;
        this.ops = ops;
    }
}

Solution

  • ((SocketChannel)key.channel()).socket().close()

    You don't need all that. Change that to:

    key.channel().close()
    

    send() which changes SelectionKey into WRITE OPERATION

    I would want to see the details of that. More likely you are never getting out of the OP_WRITE state.