Using spring-cloud-stream from spring-cloud Hoxton.SR12 release with Kafka Binder. Boot version: 2.5.2
Problem statement:
I would like to handle deserialisation errors by pushing them to a poison-pill topic with no retries.
Handle any other exceptions by retrying and then pushing to a parkingLot topic.
Do not retry ValidationException
This is my error handling code so far:
@Configuration
@Slf4j
public class ErrorHandlingConfig {
@Value("${errorHandling.parkingLotDestination}")
private String parkingLotDestination;
@Value("${errorHandling.retryAttempts}")
private long retryAttempts;
@Value("${errorHandling.retryIntervalMillis}")
private long retryIntervalMillis;
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer parkingLotPublisher) {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(parkingLotPublisher, new FixedBackOff(retryIntervalMillis, retryAttempts));
seekToCurrentErrorHandler.addNotRetryableExceptions(ValidationException.class);
return seekToCurrentErrorHandler;
}
@Bean
public DeadLetterPublishingRecoverer parkingLotPublisher(KafkaOperations bytesTemplate) {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(bytesTemplate, (cr, e) -> new TopicPartition(parkingLotDestination, cr.partition()));
deadLetterPublishingRecoverer.setHeadersFunction((cr, e) -> cr.headers());
return deadLetterPublishingRecoverer;
}
}
I think what I have so far should cover the retryable exceptions being pushed to parking lot. How do I now add in the code to push failed deserialisation events to poison topic?
I want to do this outside of the binder/binding configuration and at the container level due to the outstanding issue of not being able to send to a custom dlqName.
I could use a ErrorHandlingDeserializer and call setFailedDeserializationFunction()
on it that would contain a function that sends the message onto poison topic. Should I do this using a Source binding or raw KafkaOperations? I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.
Why are you using Hoxton with Boot 2.5? The proper cloud version for Boot 2.5.2 is 2020.0.3
.
The SeekToCurrentErrorHandler
already considers DeserializationException
s to be fatal. See
/**
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionTypes the exception types.
* @since 2.6
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
*/
@SafeVarargs
@SuppressWarnings("varargs")
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
The ErrorHandlingDeserializer
(without a function) adds the exception to a header; the DeadLetterPublishingRecoverer
automatically extracts the original payload from the header and sets as the value()
of the outgoing record (byte[]
).
Since you are using native encoding, you will need two KafkaTemplate
s - one for the failed records that need to be re-serialized and one for the DeserializationException
s (that uses a ByteArraySerializer
.
See
/**
* Create an instance with the provided templates and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
*/
@SuppressWarnings("unchecked")
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.
Just set the appropriate properties - see the documentation.