Using spring-cloud-stream from spring-cloud Hoxton.SR12 release with Kafka Binder. Boot version: 2.5.2
Problem statement:
I would like to handle deserialisation errors by pushing them to a poison-pill topic with no retries.
Handle any other exceptions by retrying and then pushing to a parkingLot topic.
Do not retry ValidationException
This is my error handling code so far:
public class ErrorHandlingConfig {
private String parkingLotDestination;
private long retryAttempts;
private long retryIntervalMillis;
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer parkingLotPublisher) {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(parkingLotPublisher, new FixedBackOff(retryIntervalMillis, retryAttempts));
return seekToCurrentErrorHandler;
public DeadLetterPublishingRecoverer parkingLotPublisher(KafkaOperations bytesTemplate) {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(bytesTemplate, (cr, e) -> new TopicPartition(parkingLotDestination, cr.partition()));
deadLetterPublishingRecoverer.setHeadersFunction((cr, e) -> cr.headers());
return deadLetterPublishingRecoverer;
I think what I have so far should cover the retryable exceptions being pushed to parking lot. How do I now add in the code to push failed deserialisation events to poison topic?
I want to do this outside of the binder/binding configuration and at the container level due to the outstanding issue of not being able to send to a custom dlqName.
I could use a ErrorHandlingDeserializer and call setFailedDeserializationFunction()
on it that would contain a function that sends the message onto poison topic. Should I do this using a Source binding or raw KafkaOperations? I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.
Why are you using Hoxton with Boot 2.5? The proper cloud version for Boot 2.5.2 is 2020.0.3
The SeekToCurrentErrorHandler
already considers DeserializationException
s to be fatal. See
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionTypes the exception types.
* @since 2.6
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
The ErrorHandlingDeserializer
(without a function) adds the exception to a header; the DeadLetterPublishingRecoverer
automatically extracts the original payload from the header and sets as the value()
of the outgoing record (byte[]
Since you are using native encoding, you will need two KafkaTemplate
s - one for the failed records that need to be re-serialized and one for the DeserializationException
s (that uses a ByteArraySerializer
* Create an instance with the provided templates and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.
Just set the appropriate properties - see the documentation.