Search code examples
springspring-bootapache-kafkadeserializationspring-kafka

Accessing ConsumerRecord value after ErrorHandlingDeserializer Spring Boot Kafka


Im trying to manage Deserialization errors with my Kafka Listener. The goal is to write every failing Record on a Database. Im using Spring Boots Autoconfiguration Features. The important parts of my project are looking like this.

application.properties

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

KafkaConsumer

@KafkaListener(topics = "${kafka.topic}", properties = {
        "spring.json.value.default.type=com.test.Person"})
    public void listen( //
            @Header(KafkaHeaders.RECEIVED_PARTITION) String partitionId, //
            @Header(KafkaHeaders.OFFSET) String offset, //
            @Header(KafkaHeaders.RECEIVED_KEY) String messageKey, //
            @Payload Person person) {
 
           ...
    }

KafkaErrorHandler

public class KafkaErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<String, byte[]> byteTemplate;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {

        ConsumerRecord<String, byte[]> test = byteTemplate.receive(data.topic(), data.partition(), data.offset());


     THIS IS WHERE I WANT TO WRITE ON MY DATABASE


    }
}

My Problem is now, that i dont have access to the unserialized data, because Consumer Record value is null through ErrorHandlingDeserializer and Kafka Template receive is throwing another DeserlializeException. Am i missing something?

Expecting to get access to the value as byte array


Solution

  • The thrownException is a ListenerExcecutionFailedException, which has a cause that is a DeserializationException; the raw data is available as a field therein:

        /**
         * Get the data that failed deserialization (value or key).
         * @return the data.
         */
        public byte[] getData() {