Search code examples
apache-kafkakafka-consumer-apikafka-producer-api

Commit issue with Kafka


I am working on a module and the requirement is there is a producer and we are using kafka as queue for data producing and feeding it to consumer.

Now In consumer,I am trying to implement At-Least-Once messaging scenario.

For this i have to pool the messages from kafka and then consumer those.After consuming i am calling consumer.commitAsync(offset,Callback).

I want to know what will happen

Case 1). when commitAsync() api is never called(suppose there was an exception just before calling this api).In my case,I was supposing the message will be pumped again to consumer; but it is not happening.Consumer never get that data again.

Case 2). if the consumer reboots.

Below is the code snippet of properties set with the consumer

private Properties getConsumerProperties() {
        final Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "server");
        props.put(GROUP_ID_CONFIG, "groupName");
        props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(HEARTBEAT_INTERVAL_MS_CONFIG, heartBeatinterval);
        props.put(METADATA_MAX_AGE_CONFIG, metaDataMaxAge);
        props.put(SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return props;
    }

Now in consumer, on the basis of some property set; i have 3 topics and create 3 consumers for each topic(as there are 3 partition and 3 brokers of kafka).

For consumption of data...I identify the packet in the basis of some propertywhen received fron kafka..and pass it to the relevant topic(i have taken a different thread pools for different topics and create the tasks on the basis of property in the packet and submit to thread pool).In the tasks, after processing i call the consumer.commitAsync(offset,callback).

I was expecting the same message to be pulled again from kafka in case of commitAsync is not called for some packet...but to my surprise it is not coming back...Am i missing something.Is there any sort of setting we need to do in the apache-kafka as well for At-Least-One.

Please suggest.


Solution

  • There is couple of things to be addressed in your question.

    Before I get to the suggestions on how to achieve at-least-once behavior, I'll try and address the 2 cases:

    Case 1). when commitAsync() api is never called(suppose there was an exception just before calling this api).In my case,I was supposing the message will be pumped again to consumer; but it is not happening.Consumer never get that data again.

    The reason why your consumer does not get the old data could be because of the enable.auto.commit property, this is set to true by default and will commit the offsets regularly in the background. Due to this, the consumer on subsequent runs will find an offset to work with and will just wait for new data/messages to arrive.

    Case 2). if the consumer reboots.

    This would also be similar i.e. if the consumer after rebooting finds a committed offset to work with, it will start consuming from that offset whether the offset was committed automatically due to the enable.auto.commit property set to true or by invoking commitAsync()/commitSync() explicitly.

    Now, moving to the part on how to achieve at-least-once behavior - I could think of the following 2 ways:

    1. If you want to take control of committing offsets, then set the "enable.auto.commit" property to false and then invoke commitSync() or commitAsync() with retries handled in the Callback function.

    Note: The choice of Synchronous vs Asynchronous commit will depend on your latency budget and any other requirements. So, not going too much into those details here.

    1. The other option is to utilise the automatic offset commit feature i.e. setting enable.auto.commit to true and auto.commit.interval.ms to an acceptable number (again, based on your requirements on how often would you like to commit offsets).

    I think the default behaviour of Kafka is centered around at-least-once semantics, so it should be fairly straightforward.

    I hope this helps!