Search code examples
javaqueuenio

How to let selector that socketchannel key change in java nio


I have question on usage java nio and hope someone who has a lot of java nio knowledge can help me to clarify some misconcept.

I am using java nio socket. It is possible that the write buffer is filled up using socketchannel.write(). In this case, the remaining buffer is queued and key is changed to OP_WRITE. One of my scenario is that the queue length is pretty long. Each time before call selector.select(),I change key to OP_WRITE from another queue called pendingRequest. But I find since the read is pretty slow, after the sending processing finshes, there are many messages unwritten and they are still in the queue. How to handle this problem?

In my code, I have two writing place. One is from generator: when it has message to publish, it write to channel directly. If buffer is full, the data will be queued. A second place is in dispatcher: when key is writable, it call back write() to write the queued data. I guess the two parts can compete for the write. I just feel that my code lack some handling to cooperate two writes.

Is there any solution to solve my problem presented above? I find in my code many queued data cannot be written out. When the key is writable, the generator may write data again, which cause the queued data has less change to be written out. How to make this part right? Thanks

// In WriteListener(), the writing code are the following three parts

   public synchronized int writeData(EventObject source) {      
    int n = 0; 
    int count = 0;

    SocketChannel socket = (SocketChannel)source.getSource();       
    ByteBuffer buffer = ((WriteEvent)source).getBuffer();   
    try {
        write(socket);
    } catch (IOException e1) {          
        e1.printStackTrace();
    }       

    while (buffer.position()>0) {   
        try {           
                buffer.flip();  
                n = socket.write(buffer);                                   
                if(n == 0) {
                        key.interestOps(SelectionKey.OP_WRITE);                         synchronized (this.pendingData) {  
                            List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket); 
                            if(queue == null) {
                                queue = new ArrayList<ByteBuffer>();
                                this.pendingData.put(socket, queue); 
                        }
                        queue.add(buffer);

                        logger.logInfo("queue length:" + queue.size());
                    }                                               
                    break;
                }               
                count += n; 

        } catch (IOException e) {               
            e.printStackTrace();
        }   finally {                       
            buffer.compact();              
        }
    }   

    if(buffer.position()==0) {                      
        key.interestOps(SelectionKey.OP_READ);                  
    }
            return count;   

}   

// ==== This write method is used to write queued buffer

  public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {        
    int n = 0; 
    int count = 0;

    SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());                
    while (wbuf.position()>0) {     
        try {           
            wbuf.flip();        

            n = sc.write(wbuf);             

            if(n == 0) {    
                   key.interestOps(SelectionKey.OP_WRITE);                                  
                    synchronized (this.pendingData) {  
                        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc); 
                        if(queue == null) {
                                queue = new ArrayList<ByteBuffer>();
                                this.pendingData.put(sc, queue); 
                        }
                        queue.add(wbuf);
                    }

                    break;
                }               
                count += n; 

        } catch (IOException e) {               
            e.printStackTrace();
        }   finally {               

            wbuf.compact();                
        }
    }   

    if(wbuf.position()==0) {    
        wbuf.clear();               
        key.interestOps(SelectionKey.OP_READ);          
    }

return n;       
}   

// ====This method is a callback of Dispatch when key.isWritable() is true

public void write(SocketChannel socketChannel) throws IOException {         
   SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());     
    synchronized (this.pendingData) {             
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);              
        if(queue == null || queue.isEmpty()) {                 
            // We wrote away all data, so we're no longer interested                 
            // in writing on this socket. Switch back to waiting for  data.                 
            try {                     
                if (key!=null)                         
                    key.interestOps(SelectionKey.OP_READ);                 
            } catch(Exception ex) {                     
                if (key!=null)                         
                    key.cancel();                 
                }             
        }           

        // Write until there's not more data ...    
        int n = 0;
        while (queue != null && !queue.isEmpty()) {                 
            ByteBuffer buf = (ByteBuffer) queue.get(0);   
            // zero length write, break the loop and wait for next writable time 
            n = write(socketChannel, buf);

            logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms.");

            if(n==0)  {             
                break;
            }
                      queue.remove(0); 

        }        

 }   

Solution

  • If you have a consumer which is too slow, the only option may be to disconnect them to protect your server. You don't want one bad consumer impacting your other clients.

    I usually increase the send buffer size to the point where if it fills, I close the connection. This avoid the complexity of handing unwritten data in Java code because all you are really doing is extending the buffer a little bit more. If you increase the send buffer size, you are doing this transparently. It is possible you don't even need to play with the send buffer size, the default is usually about 64 KB.