Search code examples
javaasynchronousspring-integrationdslspring-jms

How to consume activemq message asynchronously using JMS, and also maintain transaction?


How can I consume JMS message asynchronously(using task executor), also maintaining transaction? If any exception happens in messageHandler it should be rolled back and requeued in activemq. Currently I am doing this, but it does not pick next message from queue until message handler consume message.(if any message takes lot of time, other messages in queue forced to wait..)

return IntegrtionFlows.from(Consumer.class, gatewayProxySpec -> gatewayProxySpec.beanName(gatewayBeanName))
.channel(Jms.pollableChannel(connectionFactory)
.destination(destinationQueue)
.jmsMessageConverter(jmsMessageConverter)
.sessionTransacted(true))
.handle(messageHandler, e->e.poller(Pollers.fixedDelay(5,TimeUnit.SECONDS).taskExecutor(consumerTaskExecutor).maxMessagesPerPoll(10).transactional(transactionManager()))).get()

I also tried this, which is truly asynchronous but since it breaks transaction boundaries, it's of no use for my use case.

return IntegrtionFlows.from(Consumer.class, gatewayProxySpec -> gatewayProxySpec.beanName(gatewayBeanName))
.channel(Jms.channel(connectionFactory)
.destination(destinationQueue)
.jmsMessageConverter(jmsMessageConverter))
.channel(MessageChannels.executor(consumerTaskExecutor))
.handle(messageHandler)
.get()

Solution

  • For your requirements I would really stay away from MessageChannels.executor() and even Jms.pollableChannel().

    Just use that Jms.channel() and see its concurrentConsumers() option:

    /**
     * Only applies if the {@link #containerType(Class)} is a
     * {@link org.springframework.jms.listener.DefaultMessageListenerContainer}
     * or a {@link org.springframework.jms.listener.SimpleMessageListenerContainer}.
     * @param concurrentConsumers the concurrentConsumers.
     * @return the current {@link JmsMessageChannelSpec}.
     * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setConcurrentConsumers(int)
     * @see org.springframework.jms.listener.SimpleMessageListenerContainer#setConcurrentConsumers(int)
     */
    public S concurrentConsumers(int concurrentConsumers) {
    

    By default it is one - exactly what you have explained with the behavior you see in your current solution.

    So, just increase the number of concurrent consumers and you should see a different picture!