I am a beginner and I am working through Manning's Camel in Action. I am setting up the transaction manager as prescribed (in Chapter 9):
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://${activemq.host}:${activemq.port}" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="2" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsTxnManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="pooledConnectionFactory" />
</bean>
<bean id="activemq" class="org.apache.camel.component.activemq.ActiveMQComponent">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="transacted" value="true" />
<property name="transactionManager" ref="jmsTxnManager"/>
</bean>
I have a simple consumer route setup:
<route id="exAroute">
<from uri="activemq:{{mq.in}}"/>
<transacted/>
<log message="${body}"/>
</route>
When I run Camel I getting a set of DEBUG log messages every second on an empty queue:
10:54:30.873 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Creating new transaction with name [JmsConsumer[TEST_IN]]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
10:54:30.873 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Created JMS transaction on Session [PooledSession { ActiveMQSession {id=ID:computer-60122-1666796067926-1:2:1,started=true} java.lang.Object@192cac46 }] from Connection [PooledConnection { ConnectionPool[ActiveMQConnection {id=ID:computer-60122-1666796067926-1:2,clientId=ID:computer-60122-1666796067926-0:1,started=true}] }]
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:computer-60122-1666796067926-1:2:1:2, lastDeliveredSequenceId: -1
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [PooledSession { ActiveMQSession {id=ID:computer-60122-1666796067926-1:2:1,started=true} java.lang.Object@192cac46 }]
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQSession - ID:computer-60122-1666796067926-1:2:1 Transaction Commit :null
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQSession - ID:computer-60122-1666796067926-1:2:1 Transaction Rollback, txid:null
I am wondering why this is? Is my setup incorrect?
I have spent 2 hours researching this without any luck. I hope you can help. Thanks in advance.
PS: If I put a message a message in the queue for consumption the route executes fine:
10:59:21.829 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:computer-60151-1666796352835-1:2:1
10:59:21.829 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.camel.component.jms.DefaultJmsMessageListenerContainer - Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] from consumer [PooledMessageConsumer { ActiveMQMessageConsumer { value=ID:computer-60151-1666796352835-1:2:1:5, started=true } }] of transactional session [PooledSession { ActiveMQSession {id=ID:computer-60151-1666796352835-1:2:1,started=true} java.lang.Object@296def58 }]
10:59:21.829 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.camel.component.jms.EndpointMessageListener - activemq://TEST_IN consumer received JMS message: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:computer-60156-1666796361583-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:computer-60156-1666796361583-1:1:1:1, destination = queue://TEST_IN, transactionId = null, expiration = 0, timestamp = 1666796361809, arrival = 0, brokerInTime = 1666796361809, brokerOutTime = 1666796361819, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4db30c0e, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = test}
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.camel.spring.spi.TransactionErrorHandler - Transaction begin (0x76adb233) redelivered(false) for (MessageId: ID:computer-60156-1666796361583-1:1:1:1:1 on ExchangeId: ID-computer-1666796353110-0-1))
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Participating in existing transaction
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] INFO exAroute - test
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.camel.spring.spi.TransactionErrorHandler - Transaction commit (0x76adb233) redelivered(false) for (MessageId: ID:computer-60156-1666796361583-1:1:1:1:1 on ExchangeId: ID-computer-1666796353110-0-1))
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [PooledSession { ActiveMQSession {id=ID:computer-60151-1666796352835-1:2:1,started=true} java.lang.Object@296def58 }]
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQSession - ID:computer-60151-1666796352835-1:2:1 Transaction Commit :TX:ID:computer-60151-1666796352835-1:2:1
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.TransactionContext - Commit: TX:ID:computer-60151-1666796352835-1:2:1 syncCount: 2
10:59:21.869 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:computer-60151-1666796352835-1:2:1:5, lastDeliveredSequenceId: 844
10:59:21.869 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQSession - ID:computer-60151-1666796352835-1:2:1 Transaction Rollback, txid:null
Apache Camel uses Spring JMS Template, and Spring's JMS template closes the session and connection after every message receive check loop in transacted mode.
This creates a lot of thrashing on the ActiveMQ broker.
Fix: Add this property to your o.a.camel.component.activemq.ActiveMQComponent bean
<property name="cacheLevelName" value="CACHE_CONSUMER" />
ref: https://camel.apache.org/components/3.18.x/jms-component.html