I am have trouble understanding how to properly handle a deserialization exception within Spring Cloud stream. Primarily because the framework implemented does not support headers and the DLQ is supposed to be a separate schema than the original message. So the process flow needs to be: consume message -> deserialization error -> DlqHandler -> serialize with NEW schema -> send to DLQ
The documentation linked below doesn't give a good idea on if that is even possible. I have seen quite a few examples of SeekToCurrentErrorHandler for Spring-Kafka but those to my knowledge are different implementations and do not match with how I could properly get the deserialization error and then have a section for custom code to serialize into a new format and move from there.
My main question is: Is capturing the deserialization exception and reserializing possible with spring cloud streams (kafka)?
Yes, but not using the binding retry or DLQ properties.
Instead, add a ListenerContainerCustomizer
bean and customize the binding's listener container with a SeekToCurrentErrorHandler
configured for the retries you need and, probably, a subclass of the DeadLetterPublishingRecoverer
using an appropriately configured KafkaTemplate
and possibly overriding the createProducerRecord
method.