Search code examples
springapache-kafkaspring-kafka

DefaultAfterRollbackProcessor without BackOff for batch listener


I'm trying to configure the error handling for a batch listener, such that if a single record fails to be processed, the entire batch is sent to the dead letter topic (with some additional logging). I'm using transactions.

This is the configuration of my listener:

    @Transactional("datasourceTransactionManager")
    @KafkaListener(
            id = "myId",
            idIsGroup = false,
            topics = "commandTopic",
            containerFactory = "containerFactoryDlqErrorHandling",
            batch = "true"
    )
    @SendTo("replyTopic")
    public List<Message<V>> listen(List<ConsumerRecord<String, String>> records) {
        
        ...
    }

and the containerFactoryDlqErrorHandling has the following configuration:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> containerFactoryDlqErrorHandling(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            KafkaProperties kafkaProperties,
            DeadletterRecoverer defaultRecoverer,
            KafkaTemplate<?, ?> kafkaTemplate,
            BackOff backOff) {
        
        var customContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(customContainerFactory,
                             new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
        var defaultErrorHandler = new DefaultAfterRollbackProcessor<>(
                defaultRecoverer,
                backOff,
                kafkaTemplate,
                true);
        defaultErrorHandler.defaultFalse();
        defaultErrorHandler.addRetryableExceptions(DataAccessResourceFailureException.class);
        defaultErrorHandler.addRetryableExceptions(TransactionException.class);
        customContainerFactory.setAfterRollbackProcessor(defaultErrorHandler);
        customContainerFactory.setContainerCustomizer(container -> {
            container.getContainerProperties().setBatchRecoverAfterRollback(true);
        });
        return customContainerFactory;
    }

This behaves almost exactly as expected, except that before moving the messages into the dead letter topic, the DefaultAfterRollbackProcessor always retries processing them with the configured backoff, even when the exception thrown is not set as retryable (e.g. ListenerExecutionFailedException).

How can I avoid the undesired retry attempts? Do I need to write my own AfterRollbackProcessor?


Solution

  • The logic in the DefaultAfterRollbackProcessor for batch is like this:

    public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Consumer<K, V> consumer,
            @Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
    
        if (recoverable && isCommitRecovered()) {
            long nextBackOff = ListenerUtils.nextBackOff(this.backOff, this.backOffs);
            if (nextBackOff != BackOffExecution.STOP) {
                SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
                try {
                    ListenerUtils.stoppableSleep(container, nextBackOff);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return;
            }
    

    So, when container property batchRecoverAfterRollback == true, there is no logic to consult with a thrown exception to determine if sleep has to be skipped.

    You may raise a GH issue for improvement like this, but for now it indeed feels like you have to write your own AfterRollbackProcessor to override that default behavior.