Search code examples
javatransactionsjbossdistributed-transactionsatomikos

Java: XA Transaction propagation within many threads


How can I use a transaction manager (such as Bitronix, JBoss TS or Atomikos) in a Java SE (not Java EE or Spring) to support the following use case:

Let's assume we have the following class:

public class Dao {

    public void updateDatabase(DB db) {
        connet to db
        run a sql
    }

}

and we create a Java Runnable from that, like the following:

public class MyRunnable extends Runnable {

    Dao dao;
    DB db;

    public MyRunnable(Dao dao, DB db) {
        this.dao=dao;
        this.db = db;
    }           

    public run() throws Exception {
        return dao.updateDatabase(db);
    }
}

Now in our Service layer, we have another class:

public class Service {

    public void updateDatabases() {

        BEGIN TRANSACTION;

        ExecutorService es = Executors.newFixedThreadPool(10);

        ExecutorCompletionService ecs = new ExecutorCompletionService(es);

        List<Future<T>> futures = new ArrayList<Future<T>>(n);

        Dao dao = new Dao();

        futures.add(ecs.submit(new MyRunnable(dao, new DB("db1")));
        futures.add(ecs.submit(new MyRunnable(dao, new DB("db2")));
        futures.add(ecs.submit(new MyRunnable(dao, new DB("db3")));

        for (int i = 0; i < n; ++i) {
            completionService.take().get();
        }

       END TRANSACTION;
    }

}

And the client can be a Servlet or any other multi-threaded environment:

public MyServlet extend HttpServlet {

    protected void service(final HttpServletRequest request, final HttpServletResponse response) throws IOException {

        Service service = new Service();

        service.updateDatabases();

    }

}

What would be the correct code for BEGIN TRANSACTION and END TRANSACTION parts? Is this even feasible? If not, what needs to be changed? The requirements is to keep the updateDatabases() method concurrent (since it will be accessing multiple databases at the same time) and transactional.


Solution

  • Seems like this can be done using Atomikos using SubTxThread

    //first start a tx
    TransactionManager tm = ...
    tm.begin();
    
    Waiter waiter = new Waiter();
    
    //the code that calls the first EIS; defined by you
    SubTxCode code1 = ...
    
    //the associated thread
    SubTxThread thread1 = new SubTxThread ( waiter , code1 );
    
    //the code that calls the second EIS; defined by you
    SubTxCode code2 = ...
    
    //the associated thread
    SubTxThread thread2 = new SubTxThread ( waiter , code2 );
    
    //start each thread
    thread1.start();
    
    thread2.start();
    
    //wait for completion of all calls
    waiter.waitForAll();
    
    //check result
    if ( waiter.getAbortCount() == 0 ) {
        //no failures -> commit tx
        tm.commit();
    } else {
        tm.rollback();
    }