Search code examples
transactionsapache-camelactivemq-classicapache-servicemix

Does ActiveMQ support multiple transactional consumers?


I am developing OSGI bundles in ServiceMix to consume from the same queue of ActiveMQ.

I need to be sure that the message is dequeued only if every thing goes fine so that I don't lose that message. So I used camel transactional client. I manged to implement it by following this tutorial http://camel.apache.org/transactional-client.html

However, the problem is that when deploy many bundles consuming from the same queue, I get only one consumer working at the same time.

What should I do to enable parallel transactional consumption in ActiveMQ ?

Thanks in advance.

Regards,


This is my implementation (just copied from the tutorial):

The camel Route:

   <route>
      <from uri="activemq:queue:foo"/>
      <transacted ref="required"/>
      <process ref="myProcessor"/>
      <to uri="activemq:queue:bar"/>
    </route>

The Spring context :

<!-- setup JMS connection factory -->
<bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="8"/>
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
</bean>

<!-- setup spring jms TX manager -->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="poolConnectionFactory"/>
</bean>

<!-- define our activemq component -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory" ref="poolConnectionFactory"/>
    <!-- define the jms consumer/producer as transacted -->
    <property name="transacted" value="true"/>
    <!-- setup the transaction manager to use -->
    <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you
         for instance use a JTA transaction manager then you must configure it -->
    <property name="transactionManager" ref="jmsTransactionManager"/>
</bean>

Solution

  • I finally found the solution here http://activemq.2283324.n4.nabble.com/Blocking-transactions-td2354801.html;cid=1431937246689-831.

    The problem is due to prefetch limit. It is recommended to put it to 0 when there are multiple consumers even if they aren't transactional.

    So I had to change my camel route by adding ?destination.consumer.prefetchSize=0 this way :

    <route> 
    <from uri="activemq:queue:foo?destination.consumer.prefetchSize=0"/>
    <transacted ref="required"/> <process ref="myProcessor"/>
    <to uri="activemq:queue:bar"/>
    </route>