Search code examples
javajbossejbactivemq-classicmessage-driven-bean

MessageDrivenBean consuming JMS message twice


I'm currently running an application within Jboss. My application need to consume JMS message from an ActiveMQ (my ActiveMQ is a module of my Jboss, I followed this procedure : https://developer.jboss.org/wiki/IntegrationOfJBossAS7WithActiveMQ).

As you can see on that link, I use a MessageDrivenBean in order to consume messages on my queue :

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "destination", propertyValue = "Mongo-DB")})
public class MongoConsumer implements MessageListener {

@Override
public void onMessage(Message message) {
    TextMessage textMessage = (TextMessage) message;

    try {
        json = textMessage.getText();
        collectionId = JsonUtils.extract(json, "_collectionId");
        uuid = JsonUtils.extract(json, "_uuid");

        queriesMongoDB.save(collectionId, json);
        LOGGER.info("Insert in mongo : {}", uuid);
    } catch (TechnicalException e) {
        LOGGER.error("Something went wrong while calling Mongo : {}", e);
        this.rollbackMdb(json);
    } catch (JMSException e) {
        LOGGER.error("Something went wrong with the Mongo Consumer", e);
    }

}
}

My session is currently started in Auto-Acknowledge mode :

connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

All of this is working really fine when I'm using it with few messages. But when I benchmark my application, and sending a lot of messages, sometimes, the MDB is consuming twice the same message, and saving it in MongoDB, twice again, of course.

The problem is, it's only happening when I have a lot of messages (like 200k), I got like ~10 duplicates.

It seems that a thread is taking the message, in order to process it, and in the meantime, an other thread is doing the exact same thing.

I also changed the type of my JMS session, in CLIENT_acknowledge mode, and adding :

 message.acknowledge();

At the beggining of my method, but that didn't help.

PS : Sorry for my bad english.

EDIT :

I just reproduce the bug and read the server.log and I got this type of error for a duplicate :

16:41:35,376 WARN  [org.apache.activemq.TransactionContext] (default-threads - 39) commit of: XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:f
fff0a48263f:-669b4574:57e296f4:23fc96] failed with: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a
48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23f
c95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
        at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1420) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:761) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.TransactionContext.commit(TransactionContext.java:562) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ra.LocalAndXATransaction.commit(LocalAndXATransaction.java:92)
        at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:682)
        at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2278)
        at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1479)
        at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:98)
        at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:162)
        at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1189)
        at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:126)
        at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:75)
        at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.afterDelivery(MessageEndpointInvocationHandler.java:72) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source) [:1.8.0_66]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_66]
        at java.lang.reflect.Method.invoke(Method.java:497) [rt.jar:1.8.0_66]
        at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.handle(AbstractInvocationHandler.java:60) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:136) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at com.sun.proxy.$Proxy94.afterDelivery(Unknown Source)
        at org.apache.activemq.ra.MessageEndpointProxy$MessageEndpointAlive.afterDelivery(MessageEndpointProxy.java:128)
        at org.apache.activemq.ra.MessageEndpointProxy.afterDelivery(MessageEndpointProxy.java:69)
        at org.apache.activemq.ra.ServerSessionImpl.afterDelivery(ServerSessionImpl.java:225)
        at org.apache.activemq.ActiveMQSession.run(ActiveMQSession.java:1016) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ra.ServerSessionImpl.run(ServerSessionImpl.java:169)
        at org.jboss.jca.core.workmanager.WorkWrapper.run(WorkWrapper.java:215)
        at org.jboss.threads.SimpleDirectExecutor.execute(SimpleDirectExecutor.java:33)
        at org.jboss.threads.QueueExecutor.runTask(QueueExecutor.java:808)
        at org.jboss.threads.QueueExecutor.access$100(QueueExecutor.java:45)
        at org.jboss.threads.QueueExecutor$Worker.run(QueueExecutor.java:849)
        at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66]
        at org.jboss.threads.JBossThread.run(JBossThread.java:122)
Caused by: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
        at org.apache.activemq.transaction.XATransaction.newXAException(XATransaction.java:174)
        at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:368)
        at org.apache.activemq.broker.TransactionBroker.commitTransaction(TransactionBroker.java:252)
        at org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(MutableBrokerFilter.java:117)
        at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:498)
        at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:248)
        at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) [activemq-client-5.10.0.jar:5.10.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_66]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_66]
        at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66]

16:41:35,410 WARN  [com.arjuna.ats.jta] (default-threads - 39) ARJUNA016039: onePhaseCommit on < formatId=131077, gtrid_length=29, bqual_length=36, tx_uid=0:ffff0a48263f:-669b4574:57e296f4:23fc95, node_name=1, branch_uid=0:ffff0a48263f:-669b4574:57e296f4:23fc96, subordinatenodename=null, eis_name=unknown eis name > ([org.apache.activemq.ra.LocalAndXATransaction@7e5f0986,TransactionContext{transactionId=null,connection=ActiveMQConnection {id=ID:tsfla902v-34440-1474467574382-7:1,clientId=ID:tsfla902v-34440-1474467574382-6:1,started=true}}]) failed with exception XAException.XAER_NOTA: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
        at org.apache.activemq.TransactionContext.toXAException(TransactionContext.java:786) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.TransactionContext.commit(TransactionContext.java:595) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ra.LocalAndXATransaction.commit(LocalAndXATransaction.java:92)
        at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:682)
        at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2278)
        at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1479)
        at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:98)
        at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:162)
        at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1189)
        at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:126)
        at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:75)
        at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.afterDelivery(MessageEndpointInvocationHandler.java:72) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source) [:1.8.0_66]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_66]
        at java.lang.reflect.Method.invoke(Method.java:497) [rt.jar:1.8.0_66]
        at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.handle(AbstractInvocationHandler.java:60) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:136) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at com.sun.proxy.$Proxy94.afterDelivery(Unknown Source)
        at org.apache.activemq.ra.MessageEndpointProxy$MessageEndpointAlive.afterDelivery(MessageEndpointProxy.java:128)
        at org.apache.activemq.ra.MessageEndpointProxy.afterDelivery(MessageEndpointProxy.java:69)
        at org.apache.activemq.ra.ServerSessionImpl.afterDelivery(ServerSessionImpl.java:225)
        at org.apache.activemq.ActiveMQSession.run(ActiveMQSession.java:1016) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ra.ServerSessionImpl.run(ServerSessionImpl.java:169)
        at org.jboss.jca.core.workmanager.WorkWrapper.run(WorkWrapper.java:215)
        at org.jboss.threads.SimpleDirectExecutor.execute(SimpleDirectExecutor.java:33)
        at org.jboss.threads.QueueExecutor.runTask(QueueExecutor.java:808)
        at org.jboss.threads.QueueExecutor.access$100(QueueExecutor.java:45)
        at org.jboss.threads.QueueExecutor$Worker.run(QueueExecutor.java:849)
        at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66]
        at org.jboss.threads.JBossThread.run(JBossThread.java:122)
Caused by: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
        at org.apache.activemq.transaction.XATransaction.newXAException(XATransaction.java:174)
        at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:368)
        at org.apache.activemq.broker.TransactionBroker.commitTransaction(TransactionBroker.java:252)
        at org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(MutableBrokerFilter.java:117)
        at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:498)
        at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:248)
        at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) [activemq-client-5.10.0.jar:5.10.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_66]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_66]
        at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66]

16:41:35,418 WARN  [com.arjuna.ats.arjuna] (default-threads - 39) ARJUNA012084: One-phase commit of action 0:ffff0a48263f:-669b4574:57e296f4:23fc95 received heuristic decision: TwoPhaseOutcome.HEURISTIC_HAZARD

EDIT 2 :

It seems that somebody had the same problem here : https://issues.apache.org/jira/browse/AMQ-5953

The version of my ActiveMQ module is 5.11, I will try to install the 5.13 version, see if it's work. I keep you guys updated.


Solution

  • I finally found the solution ! As I explain it in an edit of my original post, the problem seems to be related to an XATransaction being destroyed while my code was processing the event. The transaction was recreated in the queue, and my code was processing it after.

    I was getting this stack in my server.log :

    16:41:35,376 WARN  [org.apache.activemq.TransactionContext] (default-threads - 39) commit of: XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:f
    fff0a48263f:-669b4574:57e296f4:23fc96] failed with: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a
    48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23f
    c95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
    

    Then, I found this : https://issues.apache.org/jira/browse/AMQ-5953. My ActiveMQ module inside my JBoss was in version 5.11, I managed to install 5.14. Since now, I don't have any duplicates problem anymore.