Search code examples
spring-bootibm-mqspring-transactions

Using a shared JmsTransactionManager with spring-boot to read/write messages on same broker without XA


I have a spring-boot service that reads and writes to the same IBM MQ message broker. The process is stand-alone and does not run inside an application container. I want to implement the "Shared Transaction Resource" pattern so that I don't need to configure any JTA/XA transaction management. I have the happy path working, however the following edge case is not rolling-back the message publishing. The read is rolled-back, but the publish is still committed.

Given the MessageListener receives a message

And the message is published to another queue using the same JMS ConnectionFactory

When an Exception is thrown in onMessage() after the messages is published

Then the message is rollback onto the READ queue and is not published to the WRITE queue

My code looks like this...

@Component
public class MyJmsReceiver implements MessageListener
{
    @Autowired MyJmsSender myJmsSender;
    
    @Override
    public void onMessage(Message message)
    {
        myJmsSender.sendMessage("some-payload");
        
        if(true) throw new RuntimeException("BOOM!");
    }
}


@Component
public class MyJmsSender
{
    @Transactional(propagation = Propagation.MANDATORY)
    public void sendMessage(final String payload)
    {
        jmsTemplate.convertAndSend("QUEUE.OUT", payload);
    }
}


@Configuration
@EnableJms
@EnableTransactionManagement
public class Config
{
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory)
    {
        // using a SingleConnectionFactory gives us one reusable connection rather than opening a new one for each message published
        JmsTemplate jmsTemplate = new JmsTemplate(new SingleConnectionFactory(connectionFactory));
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }

    @Bean
    public DefaultMessageListenerContainer defaultMessageListenerContainer(
            ConnectionFactory connectionFactory,
            PlatformTransactionManager transactionManager,
            MyJmsReceiver myJmsReceiver)
    {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setConnectionFactory(connectionFactory);
        dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        dmlc.setSessionTransacted(true);
        dmlc.setTransactionManager(transactionManager);
        dmlc.setConcurrency(concurrency);
        dmlc.setDestinationName("QUEUE.IN");
        dmlc.setMessageListener(myJmsReceiver);
        return dmlc;
    }

    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
    
    @Bean
    public ConnectionFactory connectionFactory(
            @Value("${jms.host}") String host,
            @Value("${jms.port}") int port,
            @Value("${jms.queue.manager}") String queueManager,
            @Value("${jms.channel}") String channel
    ) throws JMSException
    {
        MQConnectionFactory ibmMq = new MQConnectionFactory();
        ibmMq.setHostName(host);
        ibmMq.setPort(port);
        ibmMq.setQueueManager(queueManager);
        ibmMq.setTransportType(WMQConstants.WMQ_CM_CLIENT);
        ibmMq.setChannel(channel);
        return ibmMq;
    }
}

When I enable the logging of the JmsTransactionManager I see that the publish is "Participating in existing transaction", no new txn is created, and the DMLC has rolled back the transaction. However I still see the messages as having been published, while the read message is placed back onto the queue.

2020-09-07_13:21:33.000 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Creating new transaction with name [defaultMessageListenerContainer]: PROPAGATION_REQUIRED,ISOLATION
_DEFAULT
2020-09-07_13:21:33.015 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Created JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@6934ab89] from Connection [com.ibm.mq.jms.MQQueueConnection@bd527da]
2020-09-07_13:21:33.034 [defaultMessageListenerContainer-1] INFO  c.l.c.c.r.MyJmsReceiver - "Read message from QUEUE.IN for messageId ID:414d51204c43482e434c4b2e545354205f49ea352c992702
2020-09-07_13:21:33.054 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Participating in existing transaction
2020-09-07_13:21:33.056 [defaultMessageListenerContainer-1] INFO  c.l.c.c.p.r.MyJmsSender - Sending message to queue: QUEUE.OUT
2020-09-07_13:21:33.077 [defaultMessageListenerContainer-1] ERROR c.l.c.c.r.MyJmsReceiver - Failed to process messageId: ID:414d51204c43482e434c4b2e545354205f49ea352c992702 with RuntimeException: BOOM!
2020-09-07_13:21:33.096 [defaultMessageListenerContainer-1] WARN  o.s.j.l.DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
com.xxx.receive.MessageListenerException: java.lang.RuntimeException: BOOM!
        at com.xxx.MyJmsReceiver.onMessage(MyJmsReceiver.java:83)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: BOOM!
        at com.xxx.MyJmsReceiver.onMessage(MyJmsReceiver.java:74)
        ... 9 common frames omitted
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Transactional code has requested rollback
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Initiating transaction rollback
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Rolling back JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@6934ab89]
2020-09-07_13:21:33.107 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Creating new transaction with name [defaultMessageListenerContainer]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2020-09-07_13:21:33.123 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Created JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@8d93093] from Connection [com.ibm.mq.jms.MQQueueConnection@610b3b42]

Is there a way to get this working without implementing a formal XA library like Atomikos?

My understanding is that a ChainedTransactionManager won't solve my problem either because once the inner transaction is committed (i.e. the publish) the outer transaction is unable to roll back that commit.

The publish of the message is the practically last thing that onMessage() executes.


Solution

  • Defining the SingleConnectionFactory in the JmsTemplate is the problem. You will get a new connection and therefore a new session in the sender, which makes reusing the running transaction from the listener impossible.

    Use a CachingDestinationResolver instead of the SingleConnectionFactory to improve performance:

    @Bean
    public CachingDestinationResolver cachingDestinationResolver()
    {
        JndiDestinationResolver destinationResolver = new JndiDestinationResolver();
        destinationResolver.setFallbackToDynamicDestination(true);
        return destinationResolver;
    }
    
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory,
            CachingDestinationResolver destinationResolver)
    {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setDestinationResolver(destinationResolver);
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
    
    @Bean
    public DefaultMessageListenerContainer defaultMessageListenerContainer(
            ConnectionFactory connectionFactory,
            PlatformTransactionManager transactionManager,
            MyJmsReceiver myJmsReceiver,
            CachingDestinationResolver destinationResolver)
    {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setConnectionFactory(connectionFactory);
        dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        dmlc.setSessionTransacted(true);
        dmlc.setTransactionManager(transactionManager);
        dmlc.setConcurrency(concurrency);
        dmlc.setDestinationName("MY.QUEUE.IN");
        dmlc.setDestinationResolver(destinationResolver);
        dmlc.setMessageListener(myJmsReceiver);
        return dmlc;
    }