Search code examples
javaspringspring-bootjmsbitronix

Bitronix Transaction not working with JMS queue


I have a objective of using Bitronix transaction where i should use two resources:

  1. Database
  2. JMS

I have the following java code:

package com.mycompany.app;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Enumeration;

import javax.annotation.Resource;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.transaction.Transaction;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;
import org.springframework.transaction.jta.JtaTransactionManager;

import bitronix.tm.BitronixTransactionManager;
import bitronix.tm.TransactionManagerServices;

public class JMSExample {

    static String serverUrl = "tcp://localhost:61616"; // values changed
    static String userName = "admin";
    static String password = "admin";

    static TextMessage message;

    public static void sendTopicMessage(String topicName, String messageStr) {

        Connection connection = null;

        try {
            BitronixTransactionManager btm = TransactionManagerServices.getTransactionManager();
            btm.begin();
            System.out.println("Publishing to destination '" + topicName + "'\n");
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(serverUrl);
            connection = connectionFactory.createConnection();
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            Queue queue = session.createQueue(topicName);
            Message msg = session.createTextMessage(messageStr);
            msg.setJMSCorrelationID("correlationID1");
            MessageProducer producer = session.createProducer(queue);
            producer.send(msg);
            System.out.println("Published message: " + messageStr);
            session.commit();
            session.close();
            connection.close();
            btm.rollback();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        JMSExample.sendTopicMessage("test", "Hi");
    }
}

When I run the above program I am able to see the messages in the queue. But I want the JMS transaction to get nested along with the Bitronix transaction. In other words, if the Bitronix rolls back there shouldn't be any messages in the queue as well.


Solution

  • If you want to work atomically with 2 resources in a transaction then you need to acquire javax.transaction.xa.XAResource implementations from each resource manager (i.e. the JMS broker and the database). Once you have those you can enlist them in the JTA transaction you've started with the transaction manager. Then any commit or roll-back will be performed atomically with the enlisted XA resources.

    To get an XAResource from JMS take a look at javax.jms.XAConnectionFactory. You can use createXAConnection() to get an instance of javax.jms.XAConnection and then use createXASession() to get an javax.jms.XASession and then finally use getXAResource().

    To get an XAResource from JDBC take a look at javax.sql.XADataSource. You can use getXAConnection() to get an instance of javax.sql.XAConnection and then use getXAResource().

    Then you can call getTransaction() on the transaction manager and use the returned javax.transaction.Transaction to invoke enlistResource() with your XAResources. Then when you call commit() or rollback() the transaction manager will deal with all your XAResource implementations and ensure everything is performed atomically.