Search code examples
spring-kafka

Spring Kafka set logging level on container's CommonErrorHandler


I have a Kafka Consumer application that currently has a custom CommonErrorHandler to handle @KafkaListener exceptions when they are thrown. I have a custom FixedBackOff strategy in place where it will retry up to 3 times and then publish the record to a DLQ topic but every time it retries it prints the entire stack trace of the error and I wanted to know if I can suppress that to maybe a DEBUG level so it does not clutter the console output? This is currently what I have in place (Kotlin):

factory.setCommonErrorHandler(
            DefaultErrorHandler({ record, exception ->
                val thrownException = exception.cause ?: exception.localizedMessage
                log.error(
                    "Kafka record offset=${record.offset()} is being sent to the DLQ due to=" +
                        "$thrownException"
                )
                val producerRecord = ProducerRecord<String, String>(dlqTopic, record.value().toString())
                producerRecord.headers().add("dlq-failure-reason", thrownException.toString().toByteArray())
                kafkaTemplate?.send(producerRecord)
            }, FixedBackOff(0L, 2L))

I have looked here: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers, and it mentions this, "All of the framework error handlers extend KafkaExceptionLogLevelAware which allows you to control the level at which these exceptions are logged." However, I have tried setting the logging level by extending that class and setting it's log level explicitly but to no avail. Am I missing something?


Solution

  • You don't need to subclass it, just set the property.

    All that KafkaExceptionLogLevelAware does is set the log level on the KafkaException thrown by the error handler; it's the container that then logs it via the exception's selfLog method.

    You are logging it yourself at error level (during recovery).

    EDIT

    Works fine for me...

    @SpringBootApplication
    public class So71373630Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So71373630Application.class, args).close();
        }
    
        @Bean
        DefaultErrorHandler eh() {
            DefaultErrorHandler eh = new DefaultErrorHandler((r, ex) -> System.out.println("Failed:" + r.value()),
                    new FixedBackOff(0L, 3L));
            eh.setLogLevel(Level.DEBUG);
            return eh;
        }
    
        @KafkaListener(id = "so71373630", topics = "so71373630")
        void listen(String in) {
            System.out.println(in);
            throw new RuntimeException("test");
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so71373630").partitions(1).replicas(1).build();
        }
    
        @Bean
        ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("so71373630", "foo");
                Thread.sleep(5000);
            };
        }
    
    }
    
    2022-03-07 10:10:35.460  INFO 31409 --- [o71373630-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so71373630: partitions assigned: [so71373630-0]
    foo
    2022-03-07 10:10:35.492  INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
    foo
    2022-03-07 10:10:35.987  INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
    foo
    2022-03-07 10:10:36.490  INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
    foo
    Failed:foo