Search code examples
javaapache-kafkaspring-kafka

How to skip corrupt (non-serializable) messages in Spring Kafka Consumer?


This question is for Spring Kafka, related to Apache Kafka with High Level Consumer: Skip corrupted messages

Is there a way to configure Spring Kafka consumer to skip a record that cannot be read/processed (is corrupt)?

I am seeing a situation where the consumer gets stuck on the same record if it cannot be deserialized. This is the error the consumer throws.

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

The consumer polls the topic and just keeps printing the same error in a loop till program is killed.

In a @KafkaListener that has the following Consumer factory configurations,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Solution

  • You need ErrorHandlingDeserializer: https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer

    If you can't move to that 2.2 version, consider to implement your own and return null for those records which can't be deserialized properly.

    The source code is here: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java