Search code examples
spring-integrationspring-integration-dsl

Spring Integration redelivery via errorChannel throw with JmsTransactionManager doesnt honor maximumRedeliveries


Related to SO question: Spring Integration Java DSL using JMS retry/redlivery

Using a transacted poller and JmsTransactionManager on a connectionFactory with maximumRedeliveries set to 3 results in a doubling of the actual redlievery attempts.

How can I get this to honor the redelivery settings of the connection factory?

My connectionFactory is built as:

 @Bean (name="spring-int-connection-factory")
    ActiveMQConnectionFactory jmsConnectionFactory(){
        return buildConnectionFactory(
                brokerUrl,
                DELAY_2_SECS,
                MAX_REDELIVERIES,
                "spring-int");
    }

 public static ActiveMQConnectionFactory buildConnectionFactory(String brokerUrl, Long retryDelay, Integer maxRedeliveries, String clientIdPrefix){
        ActiveMQConnectionFactory amqcf = new ActiveMQConnectionFactory();
        amqcf.setBrokerURL(brokerUrl);
        amqcf.setClientIDPrefix(clientIdPrefix);
        if (maxRedeliveries != null) {
            if (retryDelay == null) {
                retryDelay = 500L;
            }
            RedeliveryPolicy rp = new org.apache.activemq.RedeliveryPolicy();
            rp.setInitialRedeliveryDelay(retryDelay);
            rp.setRedeliveryDelay(retryDelay);
            rp.setMaximumRedeliveries(maxRedeliveries);
        }
        return amqcf;
    }

My flow with poller is as:

@Bean
    public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {

        IntegrationFlow flow =  IntegrationFlows.from(
                Jms.inboundAdapter(connectionFactory)
                        .configureJmsTemplate(t -> t.receiveTimeout(1000).sessionTransacted(true))
                        .destination(INPUT_DIRECT_QUEUE),
                e -> e.poller(Pollers
                        .fixedDelay(5000)
                        .transactional()
                        .errorChannel("customErrorChannel")
                        .maxMessagesPerPoll(2))
        ).handle(this.msgHandler).get();

        return flow;
    }

My errorChannel handler simply re-throws which causes JMS redelivery to happen.

When I run this with the handler set to always throw an exception, I see that the message handler actually receives the message 7 times (1 initial and 6 redeliveries).

I expected only 3 redeliveries according to my connectionFactory config.

Any ideas what is causing the doubling of attempts and how to mitigate it?


Solution

  • This works fine for me - stops at 4...

    @SpringBootApplication
    public class So51792909Application {
    
        private static final Logger logger = LoggerFactory.getLogger(So51792909Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So51792909Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(JmsTemplate template) {
            return args -> {
                for (int i = 0; i < 1; i++) {
                    template.convertAndSend("foo", "test");
                }
            };
        }
    
        @Bean
        public IntegrationFlow flow(ConnectionFactory connectionFactory) {
            return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
                            .destination("foo"), e -> e
                                .poller(Pollers
                                        .fixedDelay(5000)
                                        .transactional()
                                        .maxMessagesPerPoll(2)))
                    .handle((p, h) -> {
                        System.out.println(h.get("JMSXDeliveryCount"));
                        try {
                            Thread.sleep(2000);
                        }
                        catch (InterruptedException e1) {
                            Thread.currentThread().interrupt();
                        }
                        throw new RuntimeException("foo");
                    })
                    .get();
        }
    
        @Bean
        public JmsTransactionManager transactionManager(ConnectionFactory cf) {
            return new JmsTransactionManager(cf);
        }
    
        @Bean
        public ActiveMQConnectionFactory amqCF() {
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
            RedeliveryPolicy rp = new RedeliveryPolicy();
            rp.setMaximumRedeliveries(3);
            cf.setRedeliveryPolicy(rp);
            return cf;
        }
    
        public CachingConnectionFactory connectionFactory() {
            return new CachingConnectionFactory(amqCF());
        }
    
        @JmsListener(destination = "ActiveMQ.DLQ")
        public void listen(String in) {
            logger.info(in);
        }
    
    }