Search code examples
springapache-kafkaspring-cloud-stream

spring kafka cloud stream: limit retry attempts in batch mode


When exeption is thrown while consuming message spring tries to read the same message on and on and consuming of other messages basicly stops. I've tried setting defaultRetryable and retryableExceptions properties like this:

spring:
  cloud.stream:
    bindings:
      consumer-in-0:
        consumer:
          defaultRetryable: false
          retryable-exceptions:
            org.springframework.dao.DataIntegrityViolationException: false

as written here https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_retry_template_and_retrybackoff

but it had no effect, how can I disable repeated attempts of reading failed message or limit number of such attempts?

update

looking at spring source KafkaMessageChannelBinder:

protected MessageProducer createConsumerEndpoint() {
    // ...
    if (!extendedConsumerProperties.isBatchMode()
                && extendedConsumerProperties.getMaxAttempts() > 1
                && transMan == null) {

            kafkaMessageDrivenChannelAdapter
                    .setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));

so it looks like mentioned properties work only when not using batch mode, which is my case (batch==true). Wonder how I can handle retries in batch mode?.


Solution

  • Batch listeners retry forever, by default, because the framework can't tell which record in the batch failed.

    It's best to handle errors in batch mode in the listener itself.

    You can add a ListenerContainerCustomizer bean to configure a different BatchErrorHandler. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling for options.