Search code examples
apache-camelactivemq-artemisqpid

Artemis qpid consumer Not able to make connection with the broker


Below is the configuration of Qpid client, using Camel context to connect Artemis broker.

The context files are getting started as expected, but the qpid client in this case not able to connect to the Apache Artemis 2.14.0 broker.

The qpid client is not able to connect the broker server, and there is Thread Blocking. I am not sure what is actually happening on the client side.

But we noticed that for one of another service, we use a single spring camel context file with all the queues and references which works as expected.

The producer uses a producerTemplate to push the message data using camelContextAware implementation.

is there any possibility that the producer and consumer on the queue might be conflicting here?

All i see is some thread blocks on the consumer side.

common-context.xml

...
 <bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
            <property name="remoteURI" value="failover:(amqp://host1:5672,amqp://host2:5672)"/>
    </bean>
 <bean id="pooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory"
                    init-method="start" destroy-method="stop" >
        <property name="maxConnections" value="3" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
 </bean>
 <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
        <property name="concurrentConsumers" value="3" />
 </bean>
 <bean id="env-queue" class="org.apache.camel.component.amqp.AMQPComponent">
        <property name="configuration" ref="jmsConfig" />
 </bean>
...

service-context.xml Note: There were two camel context in this single xml file even then all the context got , using camel 2.20.0 version

...
< import resource="classpath:/common-context.xml"/>
...
<route id="messageProcessor" shutdownRoute="Default" shutdownRunningTask="CompleteAllTasks">
    <from uri="env-queue:queue:order-demo-queue" id="OrderInfo"/>
    <camel:setProperty propertyName="orderType">
       <camel:jsonpath trim="true">$.orderType</camel:jsonpath>
    </camel:setProperty>
...

logs:

2020-10-05 23:25:21,078: Thread-1 DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1000) on route: messageProcessor
2020-10-05 23:25:21,079: Thread-1 DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=orderventprocesor,type=consumers,name=JmsConsumer(0x3fed52a3)
2020-10-05 23:25:21,079: Thread-1 DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[env-queue://queue:order-demo-queue]
2020-10-05 23:25:21,138: Thread-1 DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-10-05 23:25:21,140: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://host1:5672 in-progress
2020-10-05 23:25:21,145: Thread-1 DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-10-05 23:25:21,145: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@51524bce
2020-10-05 23:25:21,146: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@708b22ca
2020-10-05 23:25:21,147: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@314f82d7
2020-10-05 23:25:21,148: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@73719eb3
2020-10-05 23:25:21,148: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@30beb728
2020-10-05 23:25:21,155: FailoverProvider: async work thread INFO (FailoverProvider.java:751) - Connection attempt:[1] to: amqp://host1:5672 failed
2020-10-05 23:25:21,156: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://host2:5672 in-progress
2020-10-05 23:25:21,157: FailoverProvider: async work thread INFO (FailoverProvider.java:751) - Connection attempt:[1] to: amqp://host2:5672 failed
2020-10-05 23:25:21,167: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[2] to: amqp://host1:5672 in-progress

thread dump (some thing like below)

Camel (orderventprocesor) thread #5 - JmsConsumer[order-demo-queue]" #45 daemon prio=5 os_prio=0 tid=0x00007faf395b3000 nid=0x58e3 waiting for monitor entry [0x00007faf1290c000]
INFO   | jvm 1    | 2020/10/06 00:01:38 |    java.lang.Thread.State: BLOCKED (on object monitor)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.AbstractJmsListeningContainer.getSharedConnection(AbstractJmsListeningContainer.java:492)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       - waiting to lock <0x00000007ba976500> (a java.lang.Object)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1170)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
INFO   | jvm 1    | 2020/10/06 00:01:38 |       at java.lang.Thread.run(Thread.java:748)

Updated:

When i change the remoteUri to simple amqp://localhost:5672 i see an exception on the netty-resolver

 java.lang.NoClassDefFoundError: "io/netty/resolver/AddressResolverGroup"

After including the netty-resolver 4.1.51 jar, now i see below exception.

2020-10-09 15:11:04.944 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR JmsConnection:165 - Failed to connect to remote at: amqp://localhost:5672
2020-10-09 15:11:04.946 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR DefaultJmsMessageListenerContainer:934 - Could not refresh JMS Connection for destination 'order-demo-queue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
javax.jms.JMSException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:176)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:212)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:199)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createProviderConnection(JmsPoolConnectionFactory.java:651)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:108)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:105)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.addObject(GenericKeyedObjectPool.java:1221)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createJmsPoolConnection(JmsPoolConnectionFactory.java:705)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:237)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:232)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:413)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:398)
    at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:915)
    at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:890)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1061)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport.createOrPassthroughFatal(ProviderExceptionSupport.java:46)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:308)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:162)
    ... 19 more
Caused by: java.lang.AbstractMethodError: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
    at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:50)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:70)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:65)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:56)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:151)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:230)
    ... 20 more
2020-10-09 15:11:09.949 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR JmsConnection:165 - Failed to connect to remote at: amqp://localhost:5672
2020-10-09 15:11:09.950 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR DefaultJmsMessageListenerContainer:934 - Could not refresh JMS Connection for destination 'order-demo-queue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
javax.jms.JMSException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:176)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:212)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:199)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createProviderConnection(JmsPoolConnectionFactory.java:651)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:108)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:105)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
    at org.apache.commons.pool2.impl.GenericKeyedObjectPool.addObject(GenericKeyedObjectPool.java:1221)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createJmsPoolConnection(JmsPoolConnectionFactory.java:705)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:237)
    at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:232)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:413)
    at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:398)
    at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:915)
    at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:890)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1061)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport.createOrPassthroughFatal(ProviderExceptionSupport.java:46)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:308)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:162)
    ... 19 more
Caused by: java.lang.AbstractMethodError: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
    at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:50)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:70)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:65)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:56)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:151)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:230)
    ... 20 more

Solution

  • When using the failover configuration of amqp, the client was alternating between host and not printing any exception logs.

    updated the ConnectionFactory, remoteUrl to use single host (amqp://localhost:5672), and noticed the exception was on netty.io.resolver.

    I included below dependencies and now the connection is getting established as expected.

        <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-resolver</artifactId>
          <version>4.1.51.Final</version>
        </dependency>
    
       <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-transport-native-epoll</artifactId>
          <version>4.1.51.Final</version>
        </dependency>