Search code examples
apache-camelamqpazureservicebus

Azure Service Bus (ASB) with Camel, MessageProducer has been closed (amqp:link:detach-forced)


I am using Apache Camel and camel-amqp component in order to communicate with an ASB queue.

<bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
    <property name="remoteURI" value="amqps://${service.bus.uri}?amqp.idleTimeout=120000" />
    <property name="username" value="${service.bus.username}" />
    <property name="password" value="${service.bus.password}" />    
    <property name="receiveLocalOnly" value="true" />
</bean>
<bean id="jmsCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="jmsCachingConnectionFactory"/>
    <property name="cacheLevelName" value="CACHE_CONSUMER"/>
</bean>
...
<route>
    ...
    <to uri="amqp:queue:{{service.bus.queue}}?exchangePattern=InOnly"/>
</route>

The first time that my route is running a AmqpFixedProducer is created for the {{service.bus.queue}}. If i manage to re-run the route (after 10 minutes) then I am getting the following error:

Caused by: javax.jms.JMSException: The link 'G24:360891:qpid-jms:sender:ID:b60452dd-5a2e-4478-a462-7ca280ecd953:1:2:1:themis-test' is force detached by the broker due to errors occurred in publisher(link252). Detach origin: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00. [condition = amqp:link:detach-forced]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:164)[149:org.apache.qpid.jms.client:0.26.0.redhat-1]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:117)[149:org.apache.qpid.jms.client:0.26.0.redhat-1]
    at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:262)[149:org.apache.qpid.jms.client:0.26.0.redhat-1]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:903)[149:org.apache.qpid.jms.client:0.26.0.redhat-1]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1800(AmqpProvider.java:101)[149:org.apache.qpid.jms.client:0.26.0.redhat-1]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:789)[149:org.apache.qpid.jms.client:0.26.0.redhat-1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[:1.8.0_111]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_111]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)[:1.8.0_111]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)[:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_111]
    at java.lang.Thread.run(Thread.java:745)[:1.8.0_111]

It seems that application uses the same producer for both calls bu the connection to the ASB is closed after 10 minutes.

My question is how can I manage this?

Is there a way to retrieve the producer that was automatically created by Camel, and an OnException part close it and retry again? How can I get the producer?

Thanks a lot!


Solution

  • Problem was solved by modifying cacheLevelName to CACHE_NONE instead of CACHE_CONSUMER.

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
       <property name="connectionFactory" ref="jmsCachingConnectionFactory"/>
       <property name="cacheLevelName" value="CACHE_NONE"/>
    </bean>