I am having issues finding the correct set of activemq configurations to ensure a consistent throughput of messages in a Apache Camel route. The current configuration is using the following tech:
Tomcat (7.0.56)
Below is the set of bean configuration using within Camel for ActiveMQ:
<bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:6616?jms.prefetchPolicy.queuePrefetch=100" />
<property name="watchTopicAdvisories" value="false" />
<property name="producerWindowSize" value="2300" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="20" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="idleTimeout" value="0"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="transacted" value="true"/>
-->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
Here is the is the broker specific config found in the activemq.xml file:
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="./activemq/data/" advisorySupport="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="PICKAXE.L5.PROC.>" producerFlowControl="true" storeUsageHighWaterMark="50" />
<policyEntry queue="PICKAXE.L5.COL.>" producerFlowControl="true" storeUsageHighWaterMark="95" />
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="true" />
</managementContext>
<persistenceAdapter>
<kahaDB directory="./activemq/kahadb/" />
</persistenceAdapter>
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000000">
<memoryUsage>
<memoryUsage limit="750 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="2 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="500 mb" />
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://0.0.0.0:6616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
I am running the Camel routes below. Queue A receives high volumes of messages (1000/s) so it starts filling up fairly quickly as the ultimate consumer of these messages cannot keep up. As the amount of messages eventually reaches 50% of persistent storage producer flow control rules prevent further messages being placed on queue A. However, when I inspect the queue depths via JMX both queue A and B don't change as if the consumers were blocked too.
from(activemq:queue:PICKAXE.L5.PROC.A)
.to(activemq:queue:PICKAXE.L5.COL.B);
from(activemq:queue:PICKAXE.L5.COL.B)
.autoStartup(!localFlag)
.to(customEndpoint)
.routeId(collectionRouteId);
For about a week I have tried various permutations of jms/activemq configurations with no luck so I'll appreciate any ideas. The desired behaviour I am after is for the consumers of messages in this flow to continue removing messages from persistent storage which will allow messages to continue flowing thorough.
The issue was being caused by a the excessively large sendFailIfNoSpaceAfterTimeout which was set to 3000000 in the above configuration. This was causing the broker wait before acknowledging that a send() command failed due to persistent storage being full.
The above configuration was replaced with the following:
<systemUsage sendFailIfNoSpaceAfterTimeout="300">
This ensures that (due to the messages being persistent and the queues being integrated into Camel routes) send() operations were retried every 0.3 seconds when a failure was caused by the persistent storage being full.