Search code examples
javaspringapache-camelactivemq-classicmq

Issue with Producer Flow Control in Camel Route (Persistent Messages)


I am having issues finding the correct set of activemq configurations to ensure a consistent throughput of messages in a Apache Camel route. The current configuration is using the following tech:

  • Camel (2.15.2)
  • ActiveMQ (5.12.1)
  • Tomcat (7.0.56)

    Below is the set of bean configuration using within Camel for ActiveMQ:

    <bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:6616?jms.prefetchPolicy.queuePrefetch=100" />
         <property name="watchTopicAdvisories" value="false" />
         <property name="producerWindowSize" value="2300" />
    </bean>
    
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="20" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="idleTimeout" value="0"/>
    </bean>
    
    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="pooledConnectionFactory"/> 
            <property name="transactionManager" ref="jmsTransactionManager"/> 
            <property name="transacted" value="true"/>
    

    -->

    <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
            <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig" />
    </bean>
    

Here is the is the broker specific config found in the activemq.xml file:

<broker xmlns="http://activemq.apache.org/schema/core"
        brokerName="localhost" dataDirectory="./activemq/data/" advisorySupport="false">
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue="PICKAXE.L5.PROC.>" producerFlowControl="true" storeUsageHighWaterMark="50" />
                    <policyEntry queue="PICKAXE.L5.COL.>" producerFlowControl="true" storeUsageHighWaterMark="95" />
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="true" />
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="./activemq/kahadb/" />
        </persistenceAdapter>

        <systemUsage>
            <systemUsage sendFailIfNoSpaceAfterTimeout="3000000">
                <memoryUsage>
                    <memoryUsage limit="750 mb" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="2 gb" />
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="500 mb" />
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>

            <transportConnector name="openwire"
                uri="tcp://0.0.0.0:6616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
            <transportConnector name="amqp"
                uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
        </transportConnectors>
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>
    </broker>

I am running the Camel routes below. Queue A receives high volumes of messages (1000/s) so it starts filling up fairly quickly as the ultimate consumer of these messages cannot keep up. As the amount of messages eventually reaches 50% of persistent storage producer flow control rules prevent further messages being placed on queue A. However, when I inspect the queue depths via JMX both queue A and B don't change as if the consumers were blocked too.

    from(activemq:queue:PICKAXE.L5.PROC.A)
        .to(activemq:queue:PICKAXE.L5.COL.B);

    from(activemq:queue:PICKAXE.L5.COL.B)
        .autoStartup(!localFlag)
        .to(customEndpoint)
        .routeId(collectionRouteId);

For about a week I have tried various permutations of jms/activemq configurations with no luck so I'll appreciate any ideas. The desired behaviour I am after is for the consumers of messages in this flow to continue removing messages from persistent storage which will allow messages to continue flowing thorough.


Solution

  • The issue was being caused by a the excessively large sendFailIfNoSpaceAfterTimeout which was set to 3000000 in the above configuration. This was causing the broker wait before acknowledging that a send() command failed due to persistent storage being full.

    The above configuration was replaced with the following:

    <systemUsage sendFailIfNoSpaceAfterTimeout="300">
    

    This ensures that (due to the messages being persistent and the queues being integrated into Camel routes) send() operations were retried every 0.3 seconds when a failure was caused by the persistent storage being full.