Search code examples
javaspringapache-kafkaspring-kafka

Spring-Kafka 2.6.5 infinite retry policy via stateful retry and SeekToCurrentErrorHandler


As the title suggests, I'm using spring-kafka version 2.6.5. In my architecture, I have a main topic that has a SimpleRetryPolicy paired with an ExponentialBackoffPolicy. If retry attempts are exhausted, I have a RecoveryCallback which sends the message to an error topic. The error topic is where my issues reside.

In this error topic, I need to be able to perform infinite retries and not let any messages be dropped. By 'dropped', I mean that if Spring crashes or something else equally bad occurs, I need to make sure that, when brought back up, any messages that are in the middle of processing can be re-polled (order doesnt matter). Basically I think I need to configure the ACKs so that they're confirmed after processing is done. As for infinitely retrying, I've searched around and found a number of helpful pieces of advice from users like Gary Russell. Unfortunately, differing spring-kafka versions and deprecations have made it a bit difficult to piece together a clear solution for my needs and version.

Currently, my setup looks as such:

@KafkaListener(topics = "my_topic", 
               groupId = "my_group_id", 
               containerFactory = "kafkaErrorListenerContainerFactory")
public void listenErrorTopic(String message) throws Exception {
    processingLogic(message);
    // Do I need to manually ACK afterwards (and thus also include additional params to access needed 
    // message components)?
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap();
    ...
    // Basing the need for the below 2 props off of previously found posts
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    // Unsure if the below prop is needed
    // props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
    ...
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaErrorListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
        ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // A previous post said that infinite retries could only be achieved via state retry and STCEH,
    // but there is an alternative in 2.6?
    factory.setStatefulRetry(true);
    // A previous post had '-1' passed to SeekToCurrentErrorHandler, but that is no longer possible.
    // It was suggested instead to pass Long.MAX_VALUE to the backoff period for later versions, but the 
    // policy shown was a FixedBackOffPolicy.
    factory.setErrorHandler(new SeekToCurrentErrorHandler());

    RetryTemplate retryTemplate = new retryTemplate();
    retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
    // Do I need a recovery callback set in my RetryTemplate if I want it to be infinite?
    ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(<props file value insertion here>)
    backOffPolicy.setMultiplier(<props file value insertion here>)
    backOffPolicy.setMaxInterval(<props file value insertion here>)
    retryTemplate.setBackOffPolicy(backoffPolicy);
    factory.setRetryTemplate(retryTemplate);

    return factory;
}

Ideally I'd prefer Exponential over Fixed, but I'm mainly concerned with the ability to have it be done infinitely without max.interval.ms triggering a rebalance. I left comments in the code block where I have uncertainties. If someone could clarify things, it would be greatly appreciated!


Solution

  • Using stateful retry was specifically designed to be used with a STCEH to avoid a rebalance, before the STCEH supported back offs.

    However, now that back off is supported in the STCEH, it is better to use that over a retry template.

    If you use both, the actual retries is the multiple of the STCEH and retry template retries.

    Now that the SeekToCurrentErrorHandler can be configured with a BackOff and has the ability to retry only certain exceptions (since version 2.3), the use of stateful retry, via the listener adapter retry configuration, is no longer necessary. You can provide the same functionality with appropriate configuration of the error handler and remove all retry configuration from the listener adapter. See Seek To Current Container Error Handlers for more information.

    The configuration is much simpler.

    You don't need to use manual acks; the container will commit the offsets based on the AckMode BATCH (default) or RECORD. The latter is more costly but provides less chance of redelivery.

    For infinite retries, use a FixedBackOff with UNLIMITED_ATTEMPTS (Long.MAX_VALUE) in the maxAttempts property.

    The ExponentialBackOff will retry infinitely by default. You just need to be sure that the maxInterval is less than the max.poll.interval.ms to avoid a rebalance.