Search code examples
javaspring-bootkafka-consumer-apispring-kafka

Better way of error handling in Kafka Consumer


I have a Springboot app configured with spring-kafka where I want to handle all sorts of error that can happen while listening to a topic. If any message is missed / not able to be consumed because of either Deserialization or any other Exception, there will be 2 retries and after which the message should be logged to an error file. I have two approaches that can be followed :-

First Approach( Using SeekToCurrentErrorHandler with DeadLetterPublishingRecoverer):-

@Autowired
KafkaTemplate<String,Object> template;

@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (r, e) -> {
                    if (e instanceof FooException) {
                        return new TopicPartition(r.topic() + ".DLT", r.partition());
                    }
                });
        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

        factory.setErrorHandler(errorHandler);
        return factory;
    }

But for this we require addition topic(a new .DLT topic) and then we can log it to a file.

@Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }
    
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
    @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {

    logger.error(exceptionStackTrace);
}

Approach 2 ( Using custom SeekToCurrentErrorHandler) :-

@Bean
    public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
        
        factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

private static final int MAX_RETRY_ATTEMPTS = 2;

CustomSeekToCurrentErrorHandler() {
    super(MAX_RETRY_ATTEMPTS);
}

@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
    try {
        if (!records.isEmpty()) {
            log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
            
            super.handle(exception, records, consumer, container);
        }
    } catch (SerializationException e) {
        log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
    }
}

}

Can anyone provide their suggestions on what's the standard way to implement this kind of feature. In first approach we do see an overhead of creation of .DLT topics and an additional @KafkaListener. In second approach, we can directly log our consumer record exception.


Solution

  • With the first approach, it is not necessary to use a DeadLetterPublishingRecoverer, you can use any ConsumerRecordRecoverer that you want; in fact the default recoverer simply logs the failed message.

    /**
     * Construct an instance with the default recoverer which simply logs the record after
     * the backOff returns STOP for a topic/partition/offset.
     * @param backOff the {@link BackOff}.
     * @since 2.3
     */
    public SeekToCurrentErrorHandler(BackOff backOff) {
        this(null, backOff);
    }
    

    And, in the FailedRecordTracker...

    if (recoverer == null) {
        this.recoverer = (rec, thr) -> {
            
            ...
    
            logger.error(thr, "Backoff "
                + (failedRecord == null
                    ? "none"
                    : failedRecord.getBackOffExecution())
                + " exhausted for " + ListenerUtils.recordToString(rec));
        };
    }
    

    Backoff (and a limit to retries) was added to the error handler after adding retry in the listener adapter, so it's "newer" (and preferred).

    Also, using in-memory retry can cause issues with rebalancing if long BackOffs are employed.

    Finally, only the SeekToCurrentErrorHandler can deal with deserialization problems (via the ErrorHandlingDeserializer).

    EDIT

    Use the ErrorHandlingDeserializer together with a SeekToCurrentErrorHandler. Deserialization exceptions are considered fatal and the recoverer is called immediately.

    See the documentation.

    Here is a simple Spring Boot application that demonstrates it:

    public class So63236346Application {
    
    
        private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);
    
    
        public static void main(String[] args) {
            SpringApplication.run(So63236346Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
        }
    
        @Bean
        ErrorHandler errorHandler() {
            return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
                    + ex.getMessage()));
        }
    
        @KafkaListener(id = "so63236346", topics = "so63236346")
        public void listen(String in) {
            System.out.println(in);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("so63236346", "{\"field\":\"value1\"}");
                template.send("so63236346", "junk");
                template.send("so63236346", "{\"field\":\"value2\"}");
            };
        }
    
    }
    
    package com.example.demo;
    
    public class Thing {
    
        private String field;
    
        public Thing() {
        }
    
        public Thing(String field) {
            this.field = field;
        }
    
        public String getField() {
            return this.field;
        }
    
        public void setField(String field) {
            this.field = field;
        }
    
        @Override
        public String toString() {
            return "Thing [field=" + this.field + "]";
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing
    

    Result

    Thing [field=value1]
    2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application   : so63236346-0@7
    Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
    2020-08-10 14:30:14.782  INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
    Thing [field=value2]