Search code examples
spring-integrationspring-kafka

Spring Integration Kafka Delayed Retry


Use case is to reprocess the downstream failed messages with a delay using Spring Integration's Kafka ConcurrentMessageListenerContainer. Say, max retry attempts should be 2 with a fixed delay of 5 minutes.

Is there a solution available in the Spring Integration framework out of the box?

Have gone over DeadLetterPublishingRecoverer, it helps only with moving to a DLT, no further processing unless we listen to the same DLT with a poller.


Solution

  • If you look into a Spring Integration solution, then KafkaMessageDrivenChannelAdapter which uses the mentioned ConcurrentMessageListenerContainer could be configured with a RetryTemplate:

    /**
     * Specify a {@link RetryTemplate} instance to use for retrying deliveries.
     * <p>
     * IMPORTANT: This form of retry is blocking and could cause a rebalance if the
     * aggregate retry delays across all polled records might exceed the
     * {@code max.poll.interval.ms}. Instead, consider adding a
     * {@code DefaultErrorHandler} to the listener container, configured with a
     * {@link KafkaErrorSendingMessageRecoverer}.
     * @param retryTemplate the {@link RetryTemplate} to use.
     */
    public void setRetryTemplate(RetryTemplate retryTemplate) {
    

    Another way with Spring Integration is to retry exactly in the place where you handle message downstream. For that a specific endpoint could be configured with a RequestHandlerRetryAdvice: https://docs.spring.io/spring-integration/reference/handler-advice/classes.html#retry-advice

    UPDATE

    is there an equivalent of @EnableKafkaRetryTopic in Spring Integration.

    No, there is no way to apply the @RetryableTopic in that Spring Integration's KafkaMessageDrivenChannelAdapter. The problem is that this non-blocking retry is designed exactly for declarative, annotation-based POJOs. Where the channel adapter is optimized for regular MessageListener injection into the target listener container.

    I suggest to look into a @KafkaListener with that @RetryableTopic instead and use @MessagingGateway to call Spring Integration flow from that listener method: https://docs.spring.io/spring-integration/reference/gateway.html. You may even just expect a Message<?> in your @KafkaListener and send it into an injected MessageChannel.