Search code examples
apache-kafkaspring-cloudspring-cloud-stream

Spring cloud Kafka does infinite retry when it fails


Currently, I am having an issue where one of the consumer functions throws an error which makes Kafka retry the records again and again.

@Bean
public Consumer<List<RuleEngineSubject>> processCohort() {
    return personDtoList -> {
        
        for(RuleEngineSubject subject : personDtoList)
            processSubject(subject);

    };
}

This is the consumer the processSubject throws a custom error which causes it to fail.

processCohort-in-0:
  destination: internal-process-cohort
  consumer:
    max-attempts: 1
    batch-mode: true
    concurrency: 10
  group: process-cohort-group

The above is my binder for Kafka.

Currently, I am attempting to retry 2 times and then send to a dead letter queue but I have been unsuccessful and not sure which is the right approach to take.

I have tried to implement a custom handler that will handle the error when it fails but does not retry again and I am not sure how to send to a dead letter queue

   @Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {

    return (container, dest, group) -> {
        if (group.equals("process-cohort-group")) {
            container.setBatchErrorHandler(new BatchErrorHandler() {
                @Override
                public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
                    System.out.println(data.records(dest).iterator().);
                    data.records(dest).forEach(r -> {
                        System.out.println(r.value());
                    });
                    System.out.println("failed payload='{}'" + thrownException.getLocalizedMessage());
                }
            });
        }
      
    };

}

This stops infinite retry but does not send a dead letter queue. Can I get suggestions on how to retry two times and then send a dead letter queue. From my understanding batch listener does not how to recover when there is an error, could someone help shine light on this


Solution

  • You need to configure a suitable error handler in the listener container; you can disable retry and dlq in the binding and use a DeadLetterPublishingRecoverer instead. See the answer Retry max 3 times when consuming batches in Spring Cloud Stream Kafka Binder