Search code examples
rabbitmqspring-amqpspring-rabbit

Configuring Consumer Cancellation in RabbitMQ


We are using an 2-Node active-active RabbitMQ cluster with mirrored queue. With the mirroring policy being :

"policies":[{"vhost":"/","name":"ha-all","pattern":"","apply->to":"all","definition":{"ha-mode":"all","ha-sync-mode":"automatic"},"priority":0}]

Versions : RabbitMQ 3.5.4, Erlang 17.4 , spring-amqp/spring-rabbit :1.4.5.RELEASE

Now,we are trying to achieve consumer cancellation,as mentioned in Highly Available Queues.

However,since we have not used channel,we can't use {{basicConsumer}} method as given in the above link.

How do I set,"x-cancel-on-ha-failover" to true in the configuration,itself?

With the beans xml being thus :

 <rabbit:connection-factory id="connectionFactory"  
  addresses="localhost:5672"  
  username="guest"  
  password="guest"
  channel-cache-size="5" />


<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />


 <!-- Spring AMQP Template -->  
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" message-converter="jsonMessageConverter" />

 <!-- in case connection is broken then Retry based on the below policy -->  
 <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">  
<property name="backOffPolicy">  
  <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">  
  <property name="initialInterval" value="500" />  
  <property name="multiplier" value="2" />  
  <property name="maxInterval" value="30000" />  
 </bean>  
</property>  
</bean>  

<rabbit:queue name="testQueue" durable="true">
    <rabbit:queue-arguments>
       <entry key="x-max-priority">
           <value type="java.lang.Integer">10</value> 
       </entry>
    </rabbit:queue-arguments>
</rabbit:queue>

<bean id="messsageConsumer"  class="consumer.RabbitConsumer">
</bean>
<rabbit:listener-container
  connection-factory="connectionFactory" concurrency="5" max-concurrency="5"     message-converter="jsonMessageConverter">
<rabbit:listener queues="testQueue" ref="messsageConsumer" />
</rabbit:listener-container>

Solution

  • The <rabbit:listener-container> actually populates a SimpleMessageListenerContainer bean on background. And the last one supports public void setConsumerArguments(Map<String, Object> args) on the matter.

    So, to fix your requirements you just need to build the raw SimpleMessageListenerContainer <bean> for your messsageConsumer.

    Meanwhile you are fixing that for your application, I'd ask you for the JIRA regarding adding <consumer-arguments> component. And we may be able to address it with the current GA deadline.