Search code examples
javaspring-bootspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream Kafka consumer in batch mode is not retrying


I'm using Spring Cloud Stream(version 4.0.3) and Kafka Binder in a Spring Boot application to consume messages in batches from a Kafka Topic. When an exception is thrown the entire batch is being sent to DLQ topic without retrying. Please help me find the issue.

Below is my retry and dlq configuration

@Configuration
public class KafkaRetryConfig {

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(KafkaOperations<Object, Object> bytesTemplate) {
        return (container, destinationName, group) -> {
            container.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(bytesTemplate), new FixedBackOff(5000L, 5L)));
        };
    }
}

Below is the Kafka consumer code

@Bean
    public Consumer<Message<List<records>>> recordsConsumer() {
        return message -> {
            List<records> records= message.getPayload();
            int index = IntStream.range(0, records.size())
                    .filter(streamIndex -> records.get(streamIndex).getId().equals("abc123"))
                    .findFirst()
                    .orElse(-1);
            Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            assert acknowledgment != null;
            try {
                if (index > -1) {
                    throw new RuntimeException("runtime exception");
                }
                //message processing logic
                acknowledgment.acknowledge();
            } catch (Exception e) {
                throw new BatchListenerFailedException(records.get(index).toString(),index);
            }
        };
    }

Below are my application properties

spring:
  cloud:
    stream:
      default-binder: kafka
      default:
        contentType: application/*+avro
        consumer:
          useNativeDecoding: true
          autoStartup: false
        producer:
          useNativeEncoding: true
      kafka:
        binder:
          autoCreateTopics: false
          brokers: broker
          configuration:
            enable:
              auto.commit: false
              idempotence: true
            max.in.flight.requests.per.connection: 1
            request.timeout.ms: 5000
            security.protocol: SASL_SSL
            sasl:
              kerberos:
                service:
                  name: service-name
              jaas:
                config: com.sun.security.auth.module.Krb5LoginModule required
                  doNotPrompt=true
                  useKeyTab=true
                  useTicketCache=false
                  storeKey=true
                  keyTab="xyz.keytab"
                  principal="[email protected]";
            ssl:
              endpoint.identification.algorithm:
              truststore:
                type: JKS
                location: /config/global/payx-cacerts/cacerts
                password: changeit
          consumer-properties:
            client.id: hrs-productsubscription-consumer-test-9
            key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            schema.registry.url: schema-registry-url
            #fetch.max.wait.ms: 60000
            max.poll.records: 200
          requiredAcks: -1
          producer-properties:
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        bindings:
          recordsConsumer-in-0:
            consumer:
              #ackMode: MANUAL
              startOffset: earliest
              resetOffsets: false
              autoCommitOffset: false
              enableDlq: true
              dlqName: dlq-topic-name
              dlqPartitions: 1
              dlqProducerProperties:
                configuration:
                  key.serializer: org.apache.kafka.common.serialization.StringSerializer
                  value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                  schema.registry.url: schema-registry-url
              configuration:
                group.id: group-id
                schema.registry.url: schema-registry-url
                autoStartup: true
                key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                spring:
                  deserializer:
                    key:
                      delegate.class: org.apache.kafka.common.serialization.StringDeserializer
                    value:
                      delegate.class: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
      bindings:
        recordsConsumer-in-0:
          consumer:
            batch-mode: true
            max-attempts: 2
          destination: topic-name
          group: group-name
          partitioned: true
          concurrency: 8

The retry configuration in my app doesn't seem to be working.


Solution

  • Please review the answer on this SO thread and see if this is related. Spring Cloud Stream Kafka Binder - Retries not working when using DLQ in Batch Mode

    It looks like this is due to similar issues. Please update to the latest version of the binder (4.1.2) as there were some fixes in this area on the 4.1.x line.