Lets say I have the following listener:
@KafkaListener(topics = "${some_topic}", autoStartup = "true")
public void listenForMessage(String message) {
log.warn("Accepted message");
if (true) {
throw new RuntimeException("some error");
}
}
I try to send message to the topic ${some_topic}
and see following:
2023-06-15 10:00:03.311 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.381 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.399 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.414 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.430 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.445 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.460 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.477 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.492 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.509 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
2023-06-15 10:00:03.509 ERROR 33580 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Records discarded: my-topic-0@33
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.****.listenForMessage(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: some error; nested exception is java.lang.RuntimeException: Ololo
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2871) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2407) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$46(KafkaMessageListenerContainer.java:2419) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:185) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.FallbackBatchErrorHandler.handle(FallbackBatchErrorHandler.java:131) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:160) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.FailedBatchProcessor.fallback(FailedBatchProcessor.java:191) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.FailedBatchProcessor.handle(FailedBatchProcessor.java:159) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:150) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:182) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2417) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:2095) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2074) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1429) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1393) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1290) ~[spring-kafka-2.9.7.jar:2.9.7]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2387) ~[spring-kafka-2.9.7.jar:2.9.7]
Caused by: java.lang.RuntimeException: some error
at ***(MyServiceImpl.java:59) ~[classes/:?]
at ***.listenForMessage(MyServiceImpl.java:39) ~[classes/:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.26.jar:5.3.26]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.26.jar:5.3.26]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.9.7.jar:2.9.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2387) ~[spring-kafka-2.9.7.jar:2.9.7]
We see that listener tries to accept message 10 times and after fail #10 it prints error (and sends ack as far as I understand). I wanted to play with that behaviout
I tried to implement following errorHandler to decrease retry amount to 3:
@Bean
public KafkaListenerErrorHandler eh() {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 3) {
return "FAILED";
}
throw ex;
};
}
and reference to it
@KafkaListener(topics = "${some_topic}", autoStartup = "true", errorHandler = "eh")
public void listenForMessage(String message) {
log.warn("Accepted message");
if (true) {
throw new RuntimeException("some error");
}
}
But msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class)
throws exception. So the behaviour is the same: 10 attempts to accept and then I see exception:
...
Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "org.springframework.messaging.MessageHeaders.get(Object, java.lang.Class)" is null
As far I understand there is a way to mark some exceptions retriable and some - not. Could you please provide example of configuration ?
See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
and
https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh
Add a DefaultErrorHandler
with a different FixedBackOff
(the default is 9 retries with no back off between attempts).
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
You have to enable the delivery attempts header, it is not enabled by default.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header