Search code examples
springspring-integrationspring-amqpspring-integration-dsl

Spring Integration ( Retry Strategy)


I want to create a simple IntegrationFlow with Spring integration, and I am having difficulties.

I want to create an integration flow that takes messages from a queue in Rabbit Mq and posts the messages to an endpoint Rest.

The problem I am dealing with is that when a request fails, it continues to retry endlessly, how can I implement a retry strategy in this code? For example, I want 3 retrys ,the first retry after 1 second, the second after 5 seconds and the third after 1 minute.


        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        RestTemplate restTemplate = new RestTemplate();
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(BOUTIQUE_QUEUE_NAME);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
                .handle(msg ->
                {
                    String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
                    HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
                    restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
                    System.out.println(msgString);
                   
                })
                .get();
    }

Solution

  • Add a retry interceptor to the listener container's advice chain. See https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#retry and https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#async-listeners

    EDIT

    @SpringBootApplication
    public class So63596805Application {
    
        private static final Logger LOG = LoggerFactory.getLogger(So63596805Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So63596805Application.class, args);
        }
    
        @Bean
        IntegrationFlow flow(SimpleRabbitListenerContainerFactory factory, RabbitTemplate template) {
            SimpleMessageListenerContainer container = factory.createListenerContainer();
            container.setQueueNames("foo");
            container.setAdviceChain(RetryInterceptorBuilder.stateless()
                    .maxAttempts(5)
                    .backOffOptions(1000, 2.0, 10000)
                    .recoverer((msg, cause) -> LOG.info("Retries exhausted for " + msg))
                    .build());
            return IntegrationFlows.from(Amqp.inboundAdapter(container))
                    .handle(msg -> {
                        LOG.info(msg.getPayload().toString());
                        throw new RuntimeException("test");
                    })
                    .get();
        }
    
    }
    

    This uses an exponential back off policy.

    If you use

    .maxAttempts(4)
    .backOffOptions(1000, 5.0, 60000)
    

    You will get 3 retries after 1, 5 and 25 seconds.

    1000, 8.0, 60000 will give you 1, 8 and 60 seconds.

    If you must have your specs (1, 5, 60), you will need a custom BackOffPolicy.