Search code examples
spring-cloudspring-cloud-streamdead-letterspring-cloud-stream-binder-kafka

How to Handle Deserialization Exception & Converting to New Schema with Spring Cloud Stream?


I am have trouble understanding how to properly handle a deserialization exception within Spring Cloud stream. Primarily because the framework implemented does not support headers and the DLQ is supposed to be a separate schema than the original message. So the process flow needs to be: consume message -> deserialization error -> DlqHandler -> serialize with NEW schema -> send to DLQ

The documentation linked below doesn't give a good idea on if that is even possible. I have seen quite a few examples of SeekToCurrentErrorHandler for Spring-Kafka but those to my knowledge are different implementations and do not match with how I could properly get the deserialization error and then have a section for custom code to serialize into a new format and move from there.

My main question is: Is capturing the deserialization exception and reserializing possible with spring cloud streams (kafka)?

Spring Cloud Documentation for DLQ


Solution

  • Yes, but not using the binding retry or DLQ properties.

    Instead, add a ListenerContainerCustomizer bean and customize the binding's listener container with a SeekToCurrentErrorHandler configured for the retries you need and, probably, a subclass of the DeadLetterPublishingRecoverer using an appropriately configured KafkaTemplate and possibly overriding the createProducerRecord method.