Search code examples
spring-bootapache-kafkakafka-consumer-apispring-cloud-dataflowspring-retry

Different retry strategies for different consumers in Kafka


We are dealing with a scenario where we need different retry strategies for different consumers in same application.

Please refer to below figure (a brief architecture diagram):

enter image description here

The main_consumer consumes payload from main_topic and tries to send it to an API. If the API processing fails, we are writing this failed payload into another topic called error_topic. There is a different consumer (error_consumer) which consumes from error_topic and sends the payload to API again with 3 retry attempts. If it is still a failure, then error_consumer pushes this payload into DLQ.

Problem we are facing:

We need main_consumer not to retry on failure and error_consumer to retry 3 times on failure. We took maxAttempts as 1 for main_consumer and maxAttempts as 3 for error_consumer. But with this configuration, main_consumer is retrying 3 times and error_consumer once. It is working quite opposite to what we expected.

P.S : We tried interchanging the maxAttempts for both consumers(which is illogical) in vain.

Below is the Spring cloud stream application configuration we are using:

We are running the application with both the below profiles.

application-main.yml

spring:
  cloud:
    stream:
      kafka:
        bindings:
          main-consumer-channel:
            consumer:
              autoCommitOffset: false
      bindings:
        main-consumer-channel:
          destination: main_topic
          consumer:
            maxAttempts: 1
            backOffInitialInterval: 5000
            backOffMultiplier: 2

application-error-retry.yml

spring:
  cloud:
    stream:
      kafka:
        bindings:
          error-consumer-channel:
            consumer:
              autoCommitOffset: false
      bindings:
        error-consumer-channel:
          destination: error_topic
          consumer:
             maxAttempts: 3
             backOffInitialInterval: 5000
             backOffMultiplier: 2

Solution

  • This works fine for me...

    @SpringBootApplication
    @EnableBinding(Inputs.class)
    public class So57522645Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So57522645Application.class, args);
        }
    
        @StreamListener("input1")
        public void listen1(String in) {
            System.out.println("main: " + in);
            throw new RuntimeException("fail");
        }
    
        @StreamListener("input2")
        public void listen2(String in) {
            System.out.println("error: " + in);
            throw new RuntimeException("fail");
        }
    
        @StreamListener("input3")
        public void listen3(String in) {
            System.out.println("final: " + in);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
            return args -> template.send("main", "test".getBytes());
        }
    
    }
    
    interface Inputs {
    
        @Input
        MessageChannel input1();
    
        @Input
        MessageChannel input2();
    
        @Input
        MessageChannel input3();
    
    }
    
    spring:
      cloud:
        stream:
          bindings:
            input1:
              consumer:
                max-attempts: 1
              destination: main
              group: grp1
            input2:
              consumer:
                max-attempts: 3
              destination: error.main.grp1
              group: grp2
            input3:
              destination: error.error.main.grp1.grp2
              group: grp3
          kafka:
            bindings:
              input1:
                consumer:
                  enable-dlq: true
              input2:
                consumer:
                  enable-dlq: true
    

    and

    main: test
    error: test
    error: test
    error: test
    final: test