Search code examples
javaspringapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Is there any method to retry within maxAttempts times using Acknowledge.nack for spring-cloud-stream-binder-kafka?


I am trying to use consuming batches in kafka, and I found the document said retry is not supported as follows.

Retry within the binder is not supported when using batch mode, so maxAttempts will be overridden to 1. You can configure a SeekToCurrentBatchErrorHandler (using a ListenerContainerCustomizer) to achieve similar functionality to retry in the binder. You can also use a manual AckMode and call Ackowledgment.nack(index, sleep) to commit the offsets for a partial batch and have the remaining records redelivered. Refer to the Spring for Apache Kafka documentation for more information about these techniques

If I use Acknowledge.nack(index,sleep), it will retry infinitely when error happens. Is there any method to retry within maxAttempts times using Acknowledge.nack?

The code is as like

@StreamListener(Sink.INPUT)
    public void consume(@Payload List<PayLoad> payloads, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
        
        try {
            consume(payloads);
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(0, 50);
            
        }

    }


Solution

  • There is not; you have to keep track of the retry count yourself.

    However, since version 2.5, you can now use a RecoveringBatchErrorHandler where you throw a specific exception to tell the handler which record failed and it commits the offsets for the records before that one and applies retry logic to the failed record.

    See https://docs.spring.io/spring-kafka/reference/html/#recovering-batch-eh