Search code examples
spring-bootapache-kafkaspring-kafka

Kafka Serializers vs Spring Message Converters in DeadLetterPublishingRecoverer


I'm a little confused by the serializers and message converters in spring-kafka. When should the Spring message converters be used and when only the Kafka serializers? As far as I could see in Spring the prefered way would be to configure the Kafka clients key/value serializer with StringSerializer and then configure a message converter on the KafkaTemplate for the actual POJO to string conversion. Is that correct?

I'm stuck trying to configure a DeadLetterPublishingRecoverer which should send messages to DLT for derialization errors as well as for any error while processing the deserialized message. The problem is that I need a JsonSerializer when the message was already deserialized but only a simple StringSerializer when the message could not be deserialized. Any ideas how to configure that?


Solution

  • Message conversion in the template only applies to the send() method that takes a Message<?>.

    Use one of the DLPR constructors that takes a map of templates:

        /**
         * Create an instance with the provided templates and a default destination resolving
         * function that returns a TopicPartition based on the original topic (appended with
         * ".DLT") from the failed record, and the same partition as the failed record.
         * Therefore the dead-letter topic must have at least as many partitions as the
         * original topic. The templates map keys are classes and the value the corresponding
         * template to use for objects (producer record values) of that type. A
         * {@link java.util.LinkedHashMap} is recommended when there is more than one
         * template, to ensure the map is traversed in order. To send records with a null
         * value, add a template with the {@link Void} class as a key; otherwise the first
         * template from the map values iterator will be used.
         * @param templates the {@link KafkaOperations}s to use for publishing.
         */
        public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
            this(templates, DEFAULT_DESTINATION_RESOLVER);
        }
    

    Add a template configured for byte[] values (with a ByteArraySerializer on its producer) for the deserialization exceptions.