Im trying to manage Deserialization errors with my Kafka Listener. The goal is to write every failing Record on a Database. Im using Spring Boots Autoconfiguration Features. The important parts of my project are looking like this.
application.properties
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
KafkaConsumer
@KafkaListener(topics = "${kafka.topic}", properties = {
"spring.json.value.default.type=com.test.Person"})
public void listen( //
@Header(KafkaHeaders.RECEIVED_PARTITION) String partitionId, //
@Header(KafkaHeaders.OFFSET) String offset, //
@Header(KafkaHeaders.RECEIVED_KEY) String messageKey, //
@Payload Person person) {
...
}
KafkaErrorHandler
public class KafkaErrorHandler implements ErrorHandler {
@Autowired
private KafkaTemplate<String, byte[]> byteTemplate;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
ConsumerRecord<String, byte[]> test = byteTemplate.receive(data.topic(), data.partition(), data.offset());
THIS IS WHERE I WANT TO WRITE ON MY DATABASE
}
}
My Problem is now, that i dont have access to the unserialized data, because Consumer Record value is null through ErrorHandlingDeserializer and Kafka Template receive is throwing another DeserlializeException. Am i missing something?
Expecting to get access to the value as byte array
The thrownException
is a ListenerExcecutionFailedException
, which has a cause
that is a DeserializationException
; the raw data is available as a field therein:
/**
* Get the data that failed deserialization (value or key).
* @return the data.
*/
public byte[] getData() {