Search code examples
apache-kafkaspring-kafka

Implementing Error Handling and Abort Mechanism in Kafka Consumer with Conditional Retry Logic


Spring for Apache Kafka has a default mechanism of 10 attempts after an error is thrown from a consumed message. I want this mechanism to remain in place, but I also want a way to stop it if the error thrown is the AbortException.class type.

@KafkaListener(topics = "my-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(@NotNull ConsumerRecord<String, Object> message) throws IOException {
    ...
    doSomething();
}

If doSomething() throws the AbortException exception, I want to prevent any further retries and abort the processing of this consumed message.

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    return new DefaultErrorHandler((consumerRecord, exception) -> log.error("Error while processing: {}, exception: {}", consumerRecord.topic(), exception.getClass()), fixedBackOff);
}

This handler only runs when all attempts are exhausted and the doSomething() method couldn't be executed successfully. I want something that runs after every attempt to check which exception is thrown, and if it's an AbortException, abort the remaining retries.

Is it possible?


Solution

  • See the documentation.

    and the javadocs

    /**
     * Add exception types to the default list. By default, the following exceptions will
     * not be retried:
     * <ul>
     * <li>{@link DeserializationException}</li>
     * <li>{@link MessageConversionException}</li>
     * <li>{@link ConversionException}</li>
     * <li>{@link MethodArgumentResolutionException}</li>
     * <li>{@link NoSuchMethodException}</li>
     * <li>{@link ClassCastException}</li>
     * </ul>
     * All others will be retried, unless {@link #defaultFalse()} has been called.
     * @param exceptionTypes the exception types.
     * @see #removeClassification(Class)
     * @see #setClassifications(Map, boolean)
     */
    public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
    

    You can also make it even more fine-grained...

    /**
     * Set a function to dynamically determine the {@link BackOff} to use, based on the
     * consumer record and/or exception. If null is returned, the default BackOff will be
     * used.
     * @param backOffFunction the function.
     * @since 2.6
     */
    public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
    

    on the DefaultErrorHandler.