Search code examples
apache-kafkaspring-kafka

How to not commit offsets in KafkaListener


I am using KafkaListener and want to have control over commit of Offsets based on whether the message is successfully processed or not. For this I am using below code but it throws exception on nack(index, sleep):

    @KafkaListener(topics="topic-name", groupId="group"){
    someMethod(@Header(KafkaHeaders.OFFSET) int offset, Acknowlegement ack, String message){
       try{
           processMessage(message);
           ack.acknowledge();
       }catch(Exception e){
           ack.nack(offset, 1000);
       }
    }

I have set ackMode as Manual and Auto Commit property is set to false. On failure in message processing nack throws exception:

      Exception: nack(index, sleep) is not supported by this Acknowledgement.

Any other way to handle the scenario is also welcome.


Solution

  • nack(index, sleep) is for batch listeners List<String> messages. The index is to tell the container which message within the list failed.

    For a record listener use nack(sleep) - the container already knows which record failed.

    See the documentation.

    Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

    With a record listener, when nack() is called, any pending offsets are committed, the remaing records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next poll(). The consumer thread can be paused before redelivery, by setting the sleep argument. This is similar functionality to throwing an exception when the container is configured with a SeekToCurrentErrorHandler.

    When using a batch listener, you can specify the index within the batch where the failure occurred. When nack() is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll(). This is an improvement over the SeekToCurrentBatchErrorHandler, which can only seek the entire batch for redelivery.