Search code examples
apache-kafkaspring-kafka

Publish messages that could not be de-serialized to DLT topic


I do not understand how messages that could not be de-serialized can be written to a DLT topic with spring kafka.

I configured the consumer according to the spring kafka docs and this works well for exceptions that occur after de-serialization of the message.

But when the message is not de-serializable a org.apache.kafka.common.errors.SerializationExceptionis thrown while polling for messages.

Subsequently, SeekToCurrentErrorHandler.handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, ...) is called with this exception but with an empty list of records and is therefore unable to write something to DLT topic.

How can I write those messages to DLT topic also?


Solution

  • The problem is that the exception is thrown by the Kafka client itself so Spring doesn't get to see the actual record that failed.

    That's why we added the ErrorHandlingDeserializer2 which can be used to wrap the actual deserializer; the failure is passed to the listener container and re-thrown as a DeserializationException.

    See the documentation.

    When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer2. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

    The DeadLetterPublishingRecoverer has logic to detect the exception and publish the failed record.