Search code examples
javaapache-kafkapolling

Kafka Source - Understanding the semantics of Selector.poll()


I was looking into Kafka's network layer code have a few questions regarding the Selector class, particularly how the poll() method is implemented. The poll() method goes something like this:

void poll(int timeout){
....
    /* check ready keys */
    long startSelect = time.nanoseconds();
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    }

...
}

Is there a specific requirement because of which we call pollSelectionKeys() method first for the keys returned by the select() method and then on the immediately connected keys? Is it just for clarity that we perform these operations separately, or are there some specific requirements involved?

Secondly, in the pollSelectionKeys() method, we have:

void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                       boolean isImmediatelyConnected,
                                       long currentTimeNanos){
...
    /* if channel is ready write to any sockets that have space in their buffer and for which
    we have data */
    if (channel.ready() && key.isWritable()) {
        Send send = channel.write();
        if (send != null) {
            this.completedSends.add(send);
            this.sensors.recordBytesSent(channel.id(), send.size());
        }
    }
...
}

From what I understand, we only ever write to a KafkaChannel when either it belonged to the keySet we obtained from the earlier call to the select() method, or if the KafkaChannel associated with one of the immediatelyConnectedKeys. My question is, why do we go about the business of writing to the KafkaChannels this way? More specifically, we don't we just iterate over all the KafkaChannels that have been connected, and write to them if they have a Send object associated with them? In this way, we write to the KafkaChannel as soon as we can, without waiting for it to belong to the immediatelyConnectedKeys or readyKeys.


Solution

  • The answer lies in the connect method of Selector class (relevant portion below)

     connected = socketChannel.connect(address);
    ..............................
    ................................
    
     SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
    

    As per the doc explanation of NIO SocketChannel connect

    If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method.

    So a typical workflow of interaction is as follows ( explained very well here)

    If you're connecting in non-blocking mode you should:

    • register the channel for OP_CONNECT
    • when it fires call finishConnect()
    • if that returns true, deregister OP_CONNECT and register OP_READ or OP_WRITE depending on what you want to do next
    • if it returns false, do nothing, keep selecting
    • if either connect() or finishConnect() throws an exception, close the channel and try again or forget about it or tell the user or
      whatever is appropriate.

    If you don't want to do anything until the channel connects, do the connect in blocking mode and go into non-blocking mode when the connect succeeds.

    This connect method may immediately connect as in case of local connection and may not trigger OP_CONNECT event which is registered for this connection socketChannel ( few lines after the connect call) , so when using typical java NIO register code, we may miss it. We need to call finishConnect eventually on such channels (see second bullet point in workflow). So we add such channel key to another Set immediatelyConnectedKeys so that they can be processed too later or else we would miss them completely.

     if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
                pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
                pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            }
    

    And later in pollSelectionKeys method ( note the use of finishConnect which is a call to finishConnect to the underlying SocketChannel

     /* complete any connections that have finished their handshake (either normally or immediately) */
                    if (isImmediatelyConnected || key.isConnectable()) {
                        if (channel.finishConnect()) {
    .........................
    .........................
    

    All in all the Kafka code looks like standard NIO stuff going on unless there is more which Kafka team can explain.Further good reads on this subject can be found here. An interesting misunderstanding related to this ( bug filing and eventual rejection by JDK team) can be found here

    For the second part of question you probably are asking about the below code. Why two separate calls for keys

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
                pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
                pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            }
    

    See we now have two set of keys maintained .Although there is a overall keys view is provided by selector.keys() but the key set is not directly modifiable so it's kind of read only view. The key in this key-set is removed only after it has been cancelled and its channel has been deregistered. So typically selector.selectedKeys() is used to access the ready channel. Also selector.selectedKeys() will obviously not return the keys from immediatelyConnectedKeys. The usual pattern of processing over these keys obtained from selector.selectedKeys() is to iterate over set , test what event (acceptable,connectable,readable/writable) the channel represented by key is ready for , do your stuff and then remove it from the set. This removal part is quite necessary.The Selector does not remove the SelectionKey instances from the selected key set itself. You have to do this, when you are done processing the channel. The next time the channel becomes "ready" the Selector will add it to the selected key set again. So that's the reason both are processed and the method pollSelectionKeys is designed to take care of both.