Actually I don't get this running. Maybe I've misunderstood something and this is not possible anyway. I am trying to configure 2 listeners on one and the same queue, same exchange but only the routing key should differ. My problem is that somehow things get messed up. The result is that listener A gets messages which are for listener B. But only sometimes and sometimes everything works fine. Any suggestions?
MyConfig
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHostname());
connectionFactory.setUsername(getUsername());
connectionFactory.setPassword(getPassword());
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new CustomMessageConverter());
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(10);
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
MyListeners A
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyA"))
public String myListenerA(@Payload PayloadA payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
return SUCCESS_RESPONSE;
}
MyListener B
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyB"))
public String myListenerB(@Payload PayloadB payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
return SUCCESS_RESPONSE;
}
Additional information: I've got 20 consumers on this queue. Thx in advance!
RabbitMQ doesn't work that way; unlike JMS, there is no way to select messages from a queue (e.g. based on the routing key).
All you have done is bound the same queue to the exchange with 2 different routing keys. So, yes, either listener will get the message, regardless of how it got to the queue.
With RabbitMQ, you need a separate queue for each listener. When the producer publishes to the exchange, the broker will take care of routing the message to the correct queue, based on the routing key he used.
If you have multiple instances of each listener, the messages will be distributed accordingly (only one delivery per queue).