Search code examples
javaspringapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Retry max 3 times when consuming batches in Spring Cloud Stream Kafka Binder


I am consuming batches in kafka, where retry is not supported in spring cloud stream kafka binder with batch mode, there is an option given that You can configure a SeekToCurrentBatchErrorHandler (using a ListenerContainerCustomizer) to achieve similar functionality to retry in the binder.

I tried the same, but with SeekToCurrentBatchErrorHandler, but it's retrying more than the time set which is 3 times.

  1. How can I do that? I would like to retry the whole batch.

  2. How can I send the whole batch to dlq topic? like for record listener I used to match deliveryAttempt(retry) to 3 then send to DLQ topic, check in listener.

I have checked this link, which is exactly my issue but an example would be great help, with this library spring-cloud-stream-kafka-binder, can I achieve that. Please explain with an example, I am new to this.

Currently I have below code.

@Configuration
public class ConsumerConfig {

  @Bean
  public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
    container.getContainerProperties().setAckOnError(false);
    
    SeekToCurrentBatchErrorHandler seekToCurrentBatchErrorHandler 
    = new SeekToCurrentBatchErrorHandler();
    seekToCurrentBatchErrorHandler.setBackOff(new FixedBackOff(0L, 2L));
    container.setBatchErrorHandler(seekToCurrentBatchErrorHandler);
    //container.setBatchErrorHandler(new BatchLoggingErrorHandler());
   };
 }
}

Listerner:

  @StreamListener(ActivityChannel.INPUT_CHANNEL)
  public void handleActivity(List<Message<Event>> messages,
                         @Header(name = KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment 
                                                                     acknowledgment,
                         @Header(name = "deliveryAttempt", defaultValue = "1") int 
                                                                deliveryAttempt) {
  try {
    log.info("Received activity message with message length {}", messages.size());
    nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
    acknowledgment.acknowledge();
    log.debug("Processed activity message {} successfully!!", messages.size());
  } catch (MessagePublishException e) {
    if (deliveryAttempt == 3) {
      log.error(
              String.format("Exception occurred, sending the message=%s to DLQ due to: ",
                      "message"),
              e);
      publisher.publishToDlq(EventType.UPDATE_FAILED, "message", e.getMessage());
    } else {
      throw e;
    }
  }
  }

After seeing @Gary's response added the ListenerContainerCustomizer @Bean with RetryingBatchErrorHandler, but not able to import the class. attaching screenshots.

not able to import RetryingBatchErrorHandler

my spring cloud dependencies


Solution

  • Use a RetryingBatchErrorHandler to send the whole batch to the DLT

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh

    Use a RecoveringBatchErrorHandler where you can throw a BatchListenerFailedException to tell it which record in the batch failed.

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh

    In both cases provide a DeadLetterPublishingRecoverer to the error handler; disable DLTs in the binder.

    EDIT

    Here's an example; it uses the newer functional style rather than the deprecated @StreamListener, but the same concepts apply (but you should consider moving to the functional style).

    @SpringBootApplication
    public class So69175145Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So69175145Application.class, args);
        }
    
        @Bean
        ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
                KafkaTemplate<byte[], byte[]> template) {
    
            return (container, dest, group) -> {
                container.setBatchErrorHandler(new RetryingBatchErrorHandler(new FixedBackOff(5000L, 2L),
                        new DeadLetterPublishingRecoverer(template,
                                (rec, ex) -> new TopicPartition("errors." + dest + "." + group, rec.partition()))));
            };
    
        }
    
        /*
         * DLT topic won't be auto-provisioned since enableDlq is false
         */
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("errors.so69175145.grp").partitions(1).replicas(1).build();
        }
    
        /*
         * Functional equivalent of @StreamListener
         */
        @Bean
        public Consumer<List<String>> input() {
            return list -> {
                System.out.println(list);
                throw new RuntimeException("test");
            };
        }
    
        /*
         * Not needed here - just to show we sent them to the DLT
         */
        @KafkaListener(id = "so69175145", topics = "errors.so69175145.grp")
        public void listen(String in) {
            System.out.println("From DLT: " + in);
        }
    
    }
    
    spring.cloud.stream.bindings.input-in-0.destination=so69175145
    spring.cloud.stream.bindings.input-in-0.group=grp
    spring.cloud.stream.bindings.input-in-0.content-type=text/plain
    
    spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
    
    # for DLT listener
    spring.kafka.consumer.auto-offset-reset=earliest
    
    [foo]
    2021-09-14 09:55:32.838ERROR...
    ...
    [foo]
    2021-09-14 09:55:37.873ERROR...
    ...
    [foo]
    2021-09-14 09:55:42.886ERROR...
    ...
    From DLT: foo