I'm working with a Spring Kafka batch listener and using ErrorHandlingDeserializer to handle deserialization errors. When a message encounters a DeserializerException, its header includes a 'springDeserializerExceptionValue'. I'm trying to preserve this header and send the message to another topic for retry processing.
However, I'm encountering the following error:
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.springframework.kafka.support.serializer.DeserializationExceptionHeader and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.kafka.support.serializer.FailedDeserializationInfo["headers"])
The error occurs in the DefaultKafkaHeaderMapper when trying to map the 'failedDeserializationInfo' header. I've tried to send the message with its original headers intact, but it seems the FailedDeserializationInfo object, which contains the DeserializationExceptionHeader, is causing serialization issues. How can I properly handle and serialize the FailedDeserializationInfo, particularly the DeserializationExceptionHeader, when retrying messages? Is there a way to preserve this information without running into serialization problems?
You have to re-map this internal DeserializationExceptionHeader
type into a regular RecordHeader
where you try to send this record. The header name is SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER
or SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER
, respectively.
This internal type was made to protect consumer from malicious info sent from a producer.
Yes, that DefaultKafkaHeaderMapper
can be extended for a logic we are talking about. See its fromHeaders()
to be overridden.