We are dealing with a scenario where we need different retry strategies for different consumers in same application.
Please refer to below figure (a brief architecture diagram):
The main_consumer
consumes payload from main_topic
and tries to send it to an API. If the API processing fails, we are writing this failed payload into another topic called error_topic
. There is a different consumer (error_consumer)
which consumes from error_topic
and sends the payload to API again with 3 retry attempts. If it is still a failure, then error_consumer
pushes this payload into DLQ
.
Problem we are facing:
We need main_consumer
not to retry on failure and error_consumer
to retry 3 times on failure. We took maxAttempts
as 1 for main_consumer
and maxAttempts
as 3 for error_consumer
. But with this configuration, main_consumer
is retrying 3 times and error_consumer
once. It is working quite opposite to what we expected.
P.S : We tried interchanging the maxAttempts
for both consumers(which is illogical) in vain.
Below is the Spring cloud stream application configuration we are using:
We are running the application with both the below profiles.
application-main.yml
spring:
cloud:
stream:
kafka:
bindings:
main-consumer-channel:
consumer:
autoCommitOffset: false
bindings:
main-consumer-channel:
destination: main_topic
consumer:
maxAttempts: 1
backOffInitialInterval: 5000
backOffMultiplier: 2
application-error-retry.yml
spring:
cloud:
stream:
kafka:
bindings:
error-consumer-channel:
consumer:
autoCommitOffset: false
bindings:
error-consumer-channel:
destination: error_topic
consumer:
maxAttempts: 3
backOffInitialInterval: 5000
backOffMultiplier: 2
This works fine for me...
@SpringBootApplication
@EnableBinding(Inputs.class)
public class So57522645Application {
public static void main(String[] args) {
SpringApplication.run(So57522645Application.class, args);
}
@StreamListener("input1")
public void listen1(String in) {
System.out.println("main: " + in);
throw new RuntimeException("fail");
}
@StreamListener("input2")
public void listen2(String in) {
System.out.println("error: " + in);
throw new RuntimeException("fail");
}
@StreamListener("input3")
public void listen3(String in) {
System.out.println("final: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> template.send("main", "test".getBytes());
}
}
interface Inputs {
@Input
MessageChannel input1();
@Input
MessageChannel input2();
@Input
MessageChannel input3();
}
spring:
cloud:
stream:
bindings:
input1:
consumer:
max-attempts: 1
destination: main
group: grp1
input2:
consumer:
max-attempts: 3
destination: error.main.grp1
group: grp2
input3:
destination: error.error.main.grp1.grp2
group: grp3
kafka:
bindings:
input1:
consumer:
enable-dlq: true
input2:
consumer:
enable-dlq: true
and
main: test
error: test
error: test
error: test
final: test