Search code examples
javaspringspring-bootspring-kafkaspring-cloud-stream

Error handling in Spring Cloud Stream Kafka in Batch mode


I am using Spring Cloud Stream and Kafka Binder to consume messages in batches from a Kafka Topic. I am trying to implement an error handling mechanism. As per my understanding I can't use Spring Cloud Stream's enableDLQ property in batch mode.

I have found RecoveringBatchErrorHandler and DeadLetterPublishingRecoverer to retry and send failure messages from the spring-kafka documentation. But I am not able to understand how to send the records to a custom DLQ topic following the functional programming standards. All the examples I can see is using KafkaTemplates.

Are there any good example where I can find the implementation?

This is the spring doc I have been referring to.

https://docs.spring.io/spring-kafka/docs/2.5.12.RELEASE/reference/html/#recovering-batch-eh


Solution

  • That version is no longer supported as OSS https://spring.io/projects/spring-kafka#support

    With the current version, use the DefaultErrorHandler configured with a DeadLetterPublishingRecoverer and throw a BatchListenerExcecutionFailedException to tell the framework which record in the batch failed.

    See https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling and https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters and https://docs.spring.io/spring-kafka/docs/current/reference/html/#legacy-eh

    Add a ListenerContainerCustomizer bean to add your configured error handler to the listener container.