I'm using spring's SeekToCurrentErrorHandler
with DeadLetterPublishingRecoverer
.
A message is logged each time the error handler is called, and also an event is being sent to our monitoring system.
The issue I'm seeing is that the error handler is invoked too many times for consecutive records causing a failure.
For example, for a non-retryable exception, if I produce 2 (error) msgs I get the following logs:
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
It seems like for x consumer error msgs I get: x calls to the error handler, x-1 calls to the error handler, x-2 calls, etc.
The same goes for a retriable exception, for which I see the same for each retry. The Consumer function is invoked correctly, it's only the error handler that is triggered too many times.
This is my error handling configuration:
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private final Monitor monitor;
CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
super(dlpr, retries);
super.setLogLevel(KafkaException.Level.DEBUG);
this.monitor = monitor;
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
if (!records.isEmpty()) {
records.forEach(record -> {
logAndReportError(record);
});
}
super.handle(exception, records, consumer, container);
}
}
@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
seekToCurrent.addNotRetryableException(SomeFatalException.class);
return seekToCurrent;
}
I have two questions:
Your question is not clear; please add your code and configuration.
We have to seek the records after the failed one so that they are redelivered on the next poll.