Search code examples
springspring-bootapache-kafkaspring-kafka

spring-kafka error handler invoked too many times


I'm using spring's SeekToCurrentErrorHandler with DeadLetterPublishingRecoverer.
A message is logged each time the error handler is called, and also an event is being sent to our monitoring system.
The issue I'm seeing is that the error handler is invoked too many times for consecutive records causing a failure.
For example, for a non-retryable exception, if I produce 2 (error) msgs I get the following logs:

|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42  [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42  [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication

It seems like for x consumer error msgs I get: x calls to the error handler, x-1 calls to the error handler, x-2 calls, etc.

The same goes for a retriable exception, for which I see the same for each retry. The Consumer function is invoked correctly, it's only the error handler that is triggered too many times.

This is my error handling configuration:

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
    private final Monitor monitor;

    CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
        super(dlpr, retries);
        super.setLogLevel(KafkaException.Level.DEBUG);
        this.monitor = monitor;
    }

    @Override
    public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        if (!records.isEmpty()) {
            records.forEach(record -> {
                logAndReportError(record);
            });
        }
        super.handle(exception, records, consumer, container);
    }
}

@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
    var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
   seekToCurrent.addNotRetryableException(SomeFatalException.class);
   return seekToCurrent;
}
    

I have two questions:

  1. Why is the error handler triggered so many times?
  2. Why does a seek is being executed for non-retriable exceptions?

Solution

    1. Your question is not clear; please add your code and configuration.

    2. We have to seek the records after the failed one so that they are redelivered on the next poll.