Search code examples
rabbitmqamqpspring-amqpspring-rabbitround-robin

RabbitMQ: round-robin for multiple consumer


Project consist of three main things:

  • Producer (device) send data to broker on MQTT protocol
  • RabbitMQ broker
  • Consumer (spring AMQP) for communication between application and broker.

(device MQTT protocol) producer --> RabbitMQ <-- consumer (Spring app use AMQP protocol)

I use MQTTX Client Toolbox for emulate producer:

quick.white.rabbit - topic
ewogICAibWF0cml4IjoiZm9sbG93IHRoZSB3aGl0ZSByYWJiaXTigKYiLAogICAiZGF0ZSI6Ik1hcmNoIDMxLCAxOTk5Igp9 - Base64 encoding data
@Configuration
public class RabbitMQConfig {
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("amq.topic");
    }
    @Bean
    public Queue matrixQueue(){
        return new Queue("matrix.queue");
    }

    @Bean
    public Binding binding(TopicExchange exchange,
                             Queue matrixQueue) {
        return BindingBuilder.bind(matrixQueue)
                .to(exchange)
                .with("*.*.rabbit");
    }
}
@Service
public class RabbitMQConsumer {

    static void dumb(Object payload, Map<String, Object> headers){
        System.out.println("-----------------------------------");
        headers.forEach((k,v) -> System.out.println(k + "=" + v));
        System.out.println(payload);
    }

    @RabbitListener(queues = "#{matrixQueue.name}")
    public void consumePayload(@Payload String encodedMessage, @Headers Map<String, Object> headers){
        String payload = new String(Base64.getDecoder().decode(encodedMessage));
        dumb(payload, headers);
    }
}

I run two instance of same application consumer on different port 8081 and 8082

    java -jar target/consumer_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8081
    java -jar target/consumer_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8082

The problem is the message arrives to two consumer at the same time

java -jar target/deep_dive_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8081
-----------------------------------
amqp_receivedDeliveryMode=NON_PERSISTENT
amqp_receivedRoutingKey=quick.white.rabbit
amqp_receivedExchange=amq.topic
x-mqtt-publish-qos=0
x-mqtt-dup=false
amqp_deliveryTag=3
amqp_consumerQueue=spring.gen-X6e3uiz5S-K4_RA0tt89cg
amqp_redelivered=false
id=debc649f-9ae1-82a8-be5f-0a3f2c9e2f87
amqp_consumerTag=amq.ctag-brMleQwCDNmcCHyT_lbF8A
amqp_lastInBatch=false
timestamp=1708633236541
{
   "matrix":"follow the white rabbit…",
   "date":"March 31, 1999"
}
java -jar target/deep_dive_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8082
-----------------------------------
amqp_receivedDeliveryMode=NON_PERSISTENT
amqp_receivedRoutingKey=quick.white.rabbit
amqp_receivedExchange=amq.topic
x-mqtt-publish-qos=0
x-mqtt-dup=false
amqp_deliveryTag=3
amqp_consumerQueue=spring.gen-Ym1vcVbUQ4aZqnG3Bu4OyA
amqp_redelivered=false
id=dd1a5d91-4972-dcca-0669-dc40f9d85a15
amqp_consumerTag=amq.ctag-zqy6gfLSLaZSf3H3W00M1A
amqp_lastInBatch=false
timestamp=1708633236541
{
   "matrix":"follow the white rabbit…",
   "date":"March 31, 1999"
}

How can i configure RabbitMQ to use round robin consumer, each time a message is delivered, the next consumer in the queue will receive the next message ? Currently it goes for two both consumer simultaniously.

MESSAGE 1 // port 8081

MESSAGE 2 // port 8082

MESSAGE 3 // port 8081

MESSAGE 4 // port 8082

Solution

  • For that goal you must use the same queue for all consumers. The AnonymousQueue makes all of the to be subscribed with their own queue, so all of them receives the same message.

    See more info in this tutorial: https://www.rabbitmq.com/tutorials/tutorial-two-spring-amqp

    UPDATE

    I thought about this though:

    @Configuration
    public class RabbitMQConfig {
        @Bean
        public TopicExchange exchange(){
            return new TopicExchange("amq.topic");
        }
        @Bean
        public Queue matrixQueue(){
            return new Queue("matrix.queue");
        }
        @Bean
        public Binding binding(TopicExchange exchange, Queue matrixQueue) {
            return BindingBuilder.bind(matrixQueue)
                    .to(exchange)
                    .with("*.*.rabbit");
        }
    }
    

    Since you didn't get it that DirectExchange matches only exactly to the routing key.