Search code examples
javaspringkafka-consumer-apispring-kafka

Spring Kafka Not Retryable Exceptions Filter


I'm trying to implement dynamic Kafka exception handling, specifically reties on some conditions but cannot find any way to filter exceptions that should not be retried not only by their class. My error handler code looks like this:

DefaultErrorHandler errorHandler = new DefaultErrorHandler(
                (consumerRecord, e) -> {
                    log.error("Exception happened for {}", consumerRecord, e);
                },
                exponentialBackOff);

errorHandler.addNotRetryableExceptions(NotRetryableException.class);

but I want to control what is a non-retryable exception more granularly like:

DefaultErrorHandler errorHandler = new DefaultErrorHandler(
                (consumerRecord, e) -> {
                    log.error("Exception happened for {}", consumerRecord, e);
                },
                exponentialBackOff);

errorHandler.addNotRetryableExceptions(NotRetryableException.class);
errorHandler.addNotRetryableExceptionFilter(throwable -> throwable.getMessage().startsWith("foo"));

I cannot find such functionality inside the standard implementation. I checked sources and found BinaryExceptionClassifier#classify which checks fatal/not fatal exceptions but perhaps customizing it is not the best way to deal with it.

Is there any way to effectively have a filter to check whether the expection is retryable or not?


Solution

  • Consider to use this API of that error handler:

    /**
     * Set a function to dynamically determine the {@link BackOff} to use, based on the
     * consumer record and/or exception. If null is returned, the default BackOff will be
     * used.
     * @param backOffFunction the function.
     * @since 2.6
     */
    public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
    

    https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#default-eh