Search code examples
spring-bootspring-kafka

Customized DeadLetterPublishingRecoverer not being invoked


We are upgrading from Spring Boot 2 / spring-kafka 2.8.4 to Spring Boot 3 / spring-kafka 3.1.2 And have had to transition from SeekToCurrentErrorHandler to CommonErrorHandler. Our original attempt at this was to replace the method that created the SeekToCurrentErrorHandler with a method that created a DefaultErrorHandler that calls the constructor that takes a ConsumerRecordRecoverer and a BackOff. The BiFunction of the ConsumerRecordRecoverer never is invoked.

What are we doing wrong on this upgrade?

We have tried just create a DefaultErrorHandler bean with this setup with the same results.

Here is our setup all in a @Configuration class with @EnableKafka annotation:

@Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> consumerFactory,
            KafkaTemplate<Object, Object> kafkaTemplate) {
        final var factory = new ConcurrentKafkaListenerContainerFactory<>();

        configurer.configure(factory, consumerFactory);

        factory.setCommonErrorHandler(
                feedsErrorHandler(kafkaTemplate, retryProperties.getMainTopic()));
        return factory;
    }

    private DefaultErrorHandler feedsErrorHandler(
            KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate, RetrySettings retrySettings) {
        final var backOffWithMaxRetries = createBackOffWithMaxRetries(retrySettings);
        return createCommonErrorHandler(kafkaTemplate, backOffWithMaxRetries);
    }

    private ExponentialBackOffWithMaxRetries createBackOffWithMaxRetries(
            RetrySettings retrySettings) {
        final var backOffWithMaxRetries =
                new ExponentialBackOffWithMaxRetries(retrySettings.getMaxRetries());
        backOffWithMaxRetries.setInitialInterval(retrySettings.getInitialInterval());
        backOffWithMaxRetries.setMultiplier(retrySettings.getMultiplier());
        backOffWithMaxRetries.setMaxInterval(retrySettings.getMaxInterval());
        return backOffWithMaxRetries;
    }

    private DefaultErrorHandler createCommonErrorHandler(
            KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate,
            ExponentialBackOffWithMaxRetries backOffWithMaxRetries) {
        final var defaultErrorHandler =
                new DefaultErrorHandler(
                        deadLetterPublishingRecoverer(kafkaTemplate), backOffWithMaxRetries);

        // Setup Retryable Exceptions here. And add them to the classifications map.
        // All other exceptions default to non-retryable.
        final var classified = new HashMap<Class<? extends Throwable>, Boolean>();
        retryProperties.getRetryableExceptions().forEach(aClass -> classified.put(aClass, true));
        defaultErrorHandler.setClassifications(classified, false);

        defaultErrorHandler.setCommitRecovered(true);

        return defaultErrorHandler;
    }

    private DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate) {

      return new DeadLetterPublishingRecoverer(
          kafkaTemplate,
          (consumerRecord, e) -> {
            log.error("Error while processing [{}]", consumerRecord, e);
            final var topic = determineTopic(consumerRecord, e.getCause());
            log.error("Moving [{}] to [{}]", consumerRecord, topic);
            return new TopicPartition(topic, -1);
          });
    }

What are we doing wrong on this upgrade?

We have tried just create a DefaultErrorHandler bean with this setup with the same results.


Solution

  • See FailedRecordTracker. The logic there is like this:

        long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
        if (nextBackOff != BackOffExecution.STOP) {
            this.backOffHandler.onNextBackOff(container, exception, nextBackOff);
            return false;
        }
        else {
            attemptRecovery(record, exception, topicPartition, consumer);
            map.remove(topicPartition);
            if (map.isEmpty()) {
                this.failures.remove(currentThread);
            }
            return true;
        }
    

    Make sure you indeed exhaust the number of retries.

    It might be also great to have some simple project from you to let us to reproduce and play with in debug mode.