Use case is to reprocess the downstream failed messages with a delay using Spring Integration's Kafka ConcurrentMessageListenerContainer
. Say, max retry attempts should be 2 with a fixed delay of 5 minutes.
Is there a solution available in the Spring Integration framework out of the box?
Have gone over DeadLetterPublishingRecoverer
, it helps only with moving to a DLT, no further processing unless we listen to the same DLT with a poller.
If you look into a Spring Integration solution, then KafkaMessageDrivenChannelAdapter
which uses the mentioned ConcurrentMessageListenerContainer
could be configured with a RetryTemplate
:
/**
* Specify a {@link RetryTemplate} instance to use for retrying deliveries.
* <p>
* IMPORTANT: This form of retry is blocking and could cause a rebalance if the
* aggregate retry delays across all polled records might exceed the
* {@code max.poll.interval.ms}. Instead, consider adding a
* {@code DefaultErrorHandler} to the listener container, configured with a
* {@link KafkaErrorSendingMessageRecoverer}.
* @param retryTemplate the {@link RetryTemplate} to use.
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
Another way with Spring Integration is to retry exactly in the place where you handle message downstream. For that a specific endpoint could be configured with a RequestHandlerRetryAdvice
: https://docs.spring.io/spring-integration/reference/handler-advice/classes.html#retry-advice
UPDATE
is there an equivalent of
@EnableKafkaRetryTopic
in Spring Integration.
No, there is no way to apply the @RetryableTopic
in that Spring Integration's KafkaMessageDrivenChannelAdapter
. The problem is that this non-blocking retry is designed exactly for declarative, annotation-based POJOs. Where the channel adapter is optimized for regular MessageListener
injection into the target listener container.
I suggest to look into a @KafkaListener
with that @RetryableTopic
instead and use @MessagingGateway
to call Spring Integration flow from that listener method: https://docs.spring.io/spring-integration/reference/gateway.html. You may even just expect a Message<?>
in your @KafkaListener
and send it into an injected MessageChannel
.