Search code examples
spring-kafka

Spring Kafka AckOnError


I have configured SeekToErrorHandler with DeadLetterPublisheingRecoverer

 ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(primaryConsumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(true);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);               
    factory.setErrorHandler(new SeekToErrorHandler(new DeadLetterPublisheingRecoverer(kafkaTemplate()),3));

When an exception is thrown from listener (or validator), after three retries, the message gets published to the dead letter.

The issue here is next time when restart my spring boot application (or listener container), same message gets again delivered to the listener and goes through the entire sequence and finally lands on dead letter. Is there any way to avoid this?

I have disabled auto commit and have set AckOnError(false) and AckMode(AckMode.RECORD);

In SeekToErrorHandler, I could find that the logic around SeekToUtil which throws exception until configured number of iterations gets completed and finally calling the accept method of the BiConsumer (deadletter publishing). So the container should commit the record on the final step (on publishing to dead letter) right? I have also gone through the comment on ackOnError(boolean) method in org.springframework.kafka.listener.ContainerProperties

When setAckOnError(true), I could find correct behavior with three retries and finally invoking dead letter publisher. The message not getting re-delivered when listener container restarted

Spring kafka version is 2.2.6


Solution

  • In 2.3 we added ackAfterHandle; with the default being true for the SeekToCurrentErrorHandler.

    @Override
    public boolean isAckAfterHandle() {
        return this.ackAfterHandle;
    }
    
    /**
     * Set to false to tell the container to NOT commit the offset for a recovered record.
     * @param ackAfterHandle false to suppress committing the offset.
     * @since 2.3.2
     */
    public void setAckAfterHandle(boolean ackAfterHandle) {
        this.ackAfterHandle = ackAfterHandle;
    }
    

    In 2.4 it defaults to true for all error handlers.

    https://github.com/spring-projects/spring-kafka/issues/1273