Search code examples
spring-kafka

SeekToCurrentErrorHandler + ExponentialBackOff will log errors after backOff has passed, is that intentional?


Context:

The docs in "stateful retry" (https://docs.spring.io/spring-kafka/reference/html/#stateful-retry) and "seek to current" (https://docs.spring.io/spring-kafka/reference/html/#seek-to-current) make it sound like that as a user, I should migrate away from a RetryTemplate to using the BackOff function in SeekToCurrentErrorHandler.

I currently have a mix of RetryTemplate with an infinite loop for certain exceptions + SeekToCurrentErrorHandler with a fixed retry of 3 times that works for all other exceptions.

Now I am looking to replace this attempt with handler.setBackOffFunction((record, ex) -> { ... });, but I have been facing following issue

But I am not sure if this intended, I am misconfiguring or if this is a bug.

  • Spring Boot 2.4.0
  • spring-kafka 2.6.3

Question:

When I am using the SeekToCurrentErrorHandler with large intervals, the error-message for "hey your listener threw an exception" appear to log AFTER a interval was done. Is this intentional? My code is throwing an exception and a log-entry may appear much after.

Here we line 1 executed at 22:59:14. An exception is thrown shortly after, but appears in the logs 10s later at 22:59:24. When using ExponentialBackOff, that timeframe becames larger and larger.

foo1 at 2020-11-20T22:59:14.850721600
2020-11-20 22:59:24.976  INFO 21456 --- [  kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 22:59:24.976 ERROR 21456 --- [  kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.6.3.jar:2.6.3]

Reproduceable with following:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(id = "so64937593", topics = "so64937593")
    public void listen(String in) {
        LocalDateTime now = LocalDateTime.now();
        System.out.println(in + " at " + now);
        throw new RuntimeException("Thrown at " + now);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((rec,ex) -> System.out.println("I am the dlq"), new ExponentialBackOff());
        factory.setErrorHandler(eh);
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so64937593", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo" + i));
        };
    }

}

I have found following to give me a logging right after:

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{new RetryListener() {
    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        return true;
    }
    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
    }
    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println(LocalDateTime.now() + "Oops happened at " + context);
    }
}});
retryTemplate.setRetryPolicy(new NeverRetryPolicy());
factory.setRetryTemplate(retryTemplate);

So I am adding a retryTemplate with a "neverRetry" policy, so I can use the logger. , where I now have an immediate feedback that an error happened.

foo1 at 2020-11-20T23:05:57.769425100
2020-11-20T23:05:57.769425100Oops happened at [RetryContext: count=1, lastException=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T23:05:57.769425100, exhausted=false]
2020-11-20 23:06:07.897  INFO 22608 --- [  kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 23:06:07.897 ERROR 22608 --- [  kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

I think this is a bug, but I wanted to hold off and post here before submitting a github issue.


Solution

  • The log is written by the container after the error handler exits (we have no choice about that).

    You can, however, suppress those logs by changing the log level on the SeekToCurrentErrorHandler. It sets the level on the exception and the container will log it at that level.

    /**
     * Set the level at which the exception thrown by this handler is logged.
     * @param logLevel the level (default ERROR).
     */
    public void setLogLevel(KafkaException.Level logLevel) {
        Assert.notNull(logLevel, "'logLevel' cannot be null");
        this.logLevel = logLevel;
    }
    

    This has been available since 2.5.

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

    Typically, the error handlers provided by the framework will throw an exception when the error is not "handled" (e.g. after performing a seek operation). By default, such exceptions are logged by the container at ERROR level. Starting with version 2.5, all the framework error handlers extend KafkaExceptionLogLevelAware which allows you to control the level at which these exceptions are logged.

    If you want to log the root cause (original exception) before the back off, you can subclass the error handler and log before calling super.handle(...).

    If you want an option for the framework to log the error, before backing off, feel free to open a GitHub issue.