Search code examples
apache-camelactivemq-artemisqpid

Qpid client Apache Artemis 2.14.0 High availability not working


Connecting Apache Artemis broker using Qpid client for High Availablity.

The broker instance runs in two nodes and configuration listed with replication in broker.xml

Brokers instance started on node1 (master) and node2 (slave), and it was running without any issues.

The camel qpid jms client is configured with URL as failover:(amqp://localhost:5672,amqp://localhost:5673), on executing the camel client and the context started without any issue and also noticed the connection in Broker UI Console listed as AMQP protocol. [ configuration details provided below ]

With below configuration everything works fine.

To test High availablity, i stopped the broker instance on node1, and expected Qpid Camel client to automatically discover the node2 broker and process the message. But it didn't connect as expected.

But when i used aretmis-jms camel client with URL including tcp scheme connection, i was able to successfully validate the high availablity where when broker running in node1 is stopped for some reason client discovers broker on node2 automatically. Also when the node1 starts backup client automatically connects to node1.

The Qpid client is not able to discover the backup broker. Any issues in below configuration

master: For detail configuration refer link

<ha-policy>
   <replication>
      <master>
         <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

slave:

<ha-policy>
   <replication>
      <slave>
         <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>

Client uses camel

<bean id="jmsampqConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
       <property name="remoteURI" value="failover:(ampq://localhost:5672,ampq://localhost:5673)" />
       <property name="username" value="user"/>
       <property name="password" value="pass"/>
   </bean>

  <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
    <property name="maxConnections" value="5" />
    <property name="connectionFactory" ref="jmsamqpConnectionFactory" />
  </bean>

  <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
    <property name="concurrentConsumers" value="5" />
  </bean>

   <bean id="jms" class="org.apache.camel.component.amqp.AMQPComponent">
    <property name="configuration" ref="jmsConfig" />
  </bean>
  
   <bean id="CustomBean1" class="org.specific.process.class" /> 
   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="jms:queue:enterprise1.queue" />
      <convertBodyTo type="java.lang.String" />
      <bean ref="CustomBean1" method="processCamelExchangeData" />
    </route>
    </camelContext>

maven dependency

   <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp-starter</artifactId>
      <version>2.23.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>0.54.0</version>
     </dependency> 
     <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>artemis-jms-client</artifactId>
      <version>2.14.0</version>
    </dependency>
    <dependency>
      <groupId>org.messaginghub</groupId>
      <artifactId>pooled-jms</artifactId>
      <version>1.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp</artifactId>
      <version>2.23.0</version> 
    </dependency>

logs:

2020-09-17 20:15:18,684: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:18,690: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@3962ec84
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@147e0734
2020-09-17 20:15:18,693: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@2bdab835
2020-09-17 20:15:18,694: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@7b8aebd0
2020-09-17 20:15:18,695: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@55222ee9
2020-09-17 20:15:18,743: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://localhost:5672 in-progress
2020-09-17 20:15:19,102: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (SaslMechanismFinder.java:106) - Best match for SASL auth was: SASL-PLAIN
2020-09-17 20:15:19,119: FailoverProvider: async work thread DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConnectionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1, configuredURI = failover:(amqp://localhost:5672,amqp://localhost:5673), connectedURI = null } (1)
2020-09-17 20:15:19,162: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 } is now open:
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:884) - Processing alternates uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:899) - Replacing uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:913) - Processing alternates done new uris:URI Pool { [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost] }
2020-09-17 20:15:19,165: AmqpProvider :(1):[amqp://localhost:5672] INFO (JmsConnection.java:1339) - Connection ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 connected to server: amqp://localhost:5672
2020-09-17 20:15:19,168: main DEBUG (JmsConsumer.java:106) - Started listener container org.apache.camel.component.jms.DefaultJmsMessageListenerContainer@5e9bf744 on destination enterprise1.queue
2020-09-17 20:15:19,168: main INFO (DefaultCamelContext.java:4013) - Route: route1 started and consuming from: jms://queue:enterprise1.queue
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3989) - Route: route2 >>> EventDrivenConsumerRoute[jmsTopic://topic:enterprise2.topic -> Pipeline[[Channel[convertBodyTo[java.lang.String]], Channel[org.apache.camel.component.bean.BeanProcessor@69d103f0]]]]
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1001) on route: route2
2020-09-17 20:15:19,216: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1 } (2)
2020-09-17 20:15:19,216: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2 } (3)
2020-09-17 20:15:19,216: Camel (camel) thread #6 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:3 } (4)
2020-09-17 20:15:19,217: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4 } (5)
2020-09-17 20:15:19,238: Camel (camel) thread #8 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:5 } (6)
2020-09-17 20:15:19,361: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1, destination = enterprise1.queue } (7)
2020-09-17 20:15:19,362: main DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=consumers,name=JmsConsumer(0x1132baa3)
2020-09-17 20:15:19,362: main DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[jmsTopic://topic:enterprise2.topic]
2020-09-17 20:15:19,364: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:19,365: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
...
2020-09-17 20:15:22,486: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: start -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4:1, destination = enterprise1.queue } (40)
2020-09-17 20:15:22,490: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2:1 (41)
2020-09-17 20:15:22,491: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1 (42)
...

Solution

  • The client is receiving an update of known broker URIs from the Artemis server it connects to which seems to only know about a server running with port 61617 advertised. This is resulting in the client replacing the URIs with the new set returned from the broker which is shown in the logs:

    Replacing uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost]
    

    So if the proper server to reconnect to is the original alternate on port 5673 then it won't ever try that one as the first broker told it that the only known brokers where the alternates it sent. You can change the client behaviour to not replace its original configuration of known hosts by setting the failover URI option below to either ADD or IGNORE.

    failover.amqpOpenServerListAction Controls how the failover transport behaves when the connection Open frame from the remote peer provides a list of failover hosts to the client. This option accepts one of three values; REPLACE, ADD, or IGNORE (default is REPLACE). If REPLACE is configured then all failover URIs other than the one for the current server are replaced with those provided by the remote peer. If ADD is configured then the URIs provided by the remote are added to the existing set of failover URIs, with de-duplication. If IGNORE is configured then any updates from the remote are dropped and no changes are made to the set of failover URIs in use.

    See the Qpid JMS client configuration page for more info.