Search code examples
rabbitmqspring-amqpspring-rabbit

RabbitMQ - Send message to a particular consumer in a queue


This is the scenario - There are multiple app servers. Browser can connect via websocket to any app server.

The app servers (consumers) are all listening on a particular queue. As soon as a web socket connection is received, the particular app server binds the queue with a routing key {userId} to a direct exchange.

I want a message sent to the direct exchange with the routing key {userId} to be received by only the particular app server where the binding has occured.

Is a direct exchange the right exchange to use in this case? Or should some other type of exchange be used?

I'm using spring-amqp to create dynamic bindings when a websocket comes in

// create the RabbitMq queue and bind to it
String routingKey = MessageConstants.getRoutingKeyForUserRecommendationQueue(user);
Binding userRecommendationBinding = BindingBuilder.bind(userRecommendationsQueue).
    to(directExchange).with(routingKey);
amqpAdmin.declareBinding(userRecommendationBinding);

Solution

  • Actually you go right way.

    And yes: Direct Exchange with an appropriate binding should save you.

    See more info in the RabbitMQ Tutorial: http://www.rabbitmq.com/tutorials/tutorial-four-java.html

    Also take a look into Spring AMQP Samples on the matter: https://github.com/spring-projects/spring-amqp-samples/tree/master/rabbitmq-tutorials

    UPDATE

    Unfortunately that is not what is happening. The messages seem to go randomly to any consumer, and not just the consumer that created the binding.

    M-m-m. That's possible, because we route only my the key, but after that the message is placed to the queue, which may have several consumers on different machines.

    In this case yes: the dynamic binding doesn't help.

    You should consider to create an unique new queue (auto-deleted is fine) and bind and listen exactly from that. The SimpleMessageListenerContainer supports addQueues() at runtime to start a new consumer for a new queue.

    I think that should work for you.

    You still shouldn't do anything on the producer side: the same exhchange and routingKey logic.