Search code examples
spring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Handle deserialisation errors and other exceptions separately


Using spring-cloud-stream from spring-cloud Hoxton.SR12 release with Kafka Binder. Boot version: 2.5.2

Problem statement:

  • I would like to handle deserialisation errors by pushing them to a poison-pill topic with no retries.

  • Handle any other exceptions by retrying and then pushing to a parkingLot topic.

  • Do not retry ValidationException

This is my error handling code so far:

@Configuration
@Slf4j
public class ErrorHandlingConfig {

    @Value("${errorHandling.parkingLotDestination}")
    private String parkingLotDestination;

    @Value("${errorHandling.retryAttempts}")
    private long retryAttempts;

    @Value("${errorHandling.retryIntervalMillis}")
    private long retryIntervalMillis;

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
        return (container, dest, group) -> {
            container.setErrorHandler(errorHandler);
        };
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer parkingLotPublisher) {
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(parkingLotPublisher, new FixedBackOff(retryIntervalMillis, retryAttempts));
        seekToCurrentErrorHandler.addNotRetryableExceptions(ValidationException.class); 
        return seekToCurrentErrorHandler;
    }

    @Bean
    public DeadLetterPublishingRecoverer parkingLotPublisher(KafkaOperations bytesTemplate) {
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(bytesTemplate, (cr, e) -> new TopicPartition(parkingLotDestination, cr.partition()));
        deadLetterPublishingRecoverer.setHeadersFunction((cr, e) -> cr.headers());

        return deadLetterPublishingRecoverer;
    }
}

I think what I have so far should cover the retryable exceptions being pushed to parking lot. How do I now add in the code to push failed deserialisation events to poison topic?

I want to do this outside of the binder/binding configuration and at the container level due to the outstanding issue of not being able to send to a custom dlqName.

I could use a ErrorHandlingDeserializer and call setFailedDeserializationFunction() on it that would contain a function that sends the message onto poison topic. Should I do this using a Source binding or raw KafkaOperations? I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.


Solution

  • Why are you using Hoxton with Boot 2.5? The proper cloud version for Boot 2.5.2 is 2020.0.3.

    The SeekToCurrentErrorHandler already considers DeserializationExceptions to be fatal. See

        /**
         * Add exception types to the default list. By default, the following exceptions will
         * not be retried:
         * <ul>
         * <li>{@link DeserializationException}</li>
         * <li>{@link MessageConversionException}</li>
         * <li>{@link ConversionException}</li>
         * <li>{@link MethodArgumentResolutionException}</li>
         * <li>{@link NoSuchMethodException}</li>
         * <li>{@link ClassCastException}</li>
         * </ul>
         * All others will be retried.
         * @param exceptionTypes the exception types.
         * @since 2.6
         * @see #removeNotRetryableException(Class)
         * @see #setClassifications(Map, boolean)
         */
        @SafeVarargs
        @SuppressWarnings("varargs")
        public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
    

    The ErrorHandlingDeserializer (without a function) adds the exception to a header; the DeadLetterPublishingRecoverer automatically extracts the original payload from the header and sets as the value() of the outgoing record (byte[]).

    Since you are using native encoding, you will need two KafkaTemplates - one for the failed records that need to be re-serialized and one for the DeserializationExceptions (that uses a ByteArraySerializer.

    See

        /**
         * Create an instance with the provided templates and destination resolving function,
         * that receives the failed consumer record and the exception and returns a
         * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
         * 0, no partition is set when publishing to the topic. The templates map keys are
         * classes and the value the corresponding template to use for objects (producer
         * record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
         * there is more than one template, to ensure the map is traversed in order. To send
         * records with a null value, add a template with the {@link Void} class as a key;
         * otherwise the first template from the map values iterator will be used.
         * @param templates the {@link KafkaOperations}s to use for publishing.
         * @param destinationResolver the resolving function.
         */
        @SuppressWarnings("unchecked")
        public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
                BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
    

    I also need to work out how to hook this ErrorHandingDeserialiser into the ConsumerFactory.

    Just set the appropriate properties - see the documentation.