Search code examples
javarabbitmqspring-amqpspring-rabbit

Spring and Rabbit mq message order


I am developing an application that receives rest messages grouped by a session id (session 1 can be composed of 2 messages, session 2 can be composed of 10 messages) and send them to a database. The messages of a given session have the same session id inside.

For a given session, the 1st message should be sent 1st to the database, then the 2nd, etc. The order is very important inside a session.

The order of sessions is not important and we can mix their messages, e.g. we can send the messages in this order to the database:

  • session A msg 1
  • session B msg 1
  • session A msg 2
  • session C msg 1
  • session B msg 2
  • session A msg 3
  • session C msg 2

I created 10 rabbitmq queues. The application chooses the queue regarding the session id: all messages from a given session are in the same queue.

There is 1 consumer per queue so the order in the same queue is guaranteed.

For performance reasons (and traffic growing), we have to set the number of queues higher (100 queues created by the node) or deploy other instances of the application (10 nodes that have 1 consumer on each queue - so 10 consumers per queue).

Setting the number of queue higher is not difficult but the way I did is a bit ugly and have code duplication (see below). I need suggestions to make it better (and for the day we need 1000 queues).

If we deploy 10 nodes instead of 1, there will be 10 consumers for each queue and the order of messages in a queue will not be guaranteed (so message 2 from session A could be sent to database before msg 1 from session A).

The preferred solution is the 10 nodes one, as we could make it dynamic and we could start/stop nodes in docker when needed.

Here is the dependencies I use:

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-amqp</artifactId>
        <version>1.6.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.6.3.RELEASE</version>
    </dependency>

Here is the rabbit configuration:

@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setPrefetchCount(50);
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
    String addresses = "address1,address2";
    com.rabbitmq.client.ConnectionFactory rabbitConnection = new com.rabbitmq.client.ConnectionFactory();
    rabbitConnection.setAutomaticRecoveryEnabled(true);
    rabbitConnection.setUsername("username");
    rabbitConnection.setPassword("password");
    rabbitConnection.setVirtualHost("virtualHost");
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnection);
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setAddresses(addresses);
    connectionFactory.setChannelCacheSize(100);
    return connectionFactory;
}

At the moment, I have 10 queues that I created with 10 classes. Here is a queue example:

@Component
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "queue2", durable = "true"), exchange = @Exchange(type = "topic", value = "exchange2", durable = "true"), key = "key2"))
public class QueueGroup2Listener {
    @RabbitHandler
    public void processOrder(RequestMessage received) throws DataAccessResourceFailureException {
        process(received);
    }
}

I didn't find the way to make it better than creating 10 times this class with different values in the annotations (from 1 to 10).

The question is: how can I add consumers on a queue and guarantee the order of messages in a given session? I mean there are 10 consumers in a queue. Consumer A consumes messages 1 from session A so other consumers shouldn't consume the other messages from session A.

Bonus question is: how can I make the queue creation better than 1 class per queue?

Many thanks

UPDATE

An answer to this question could help me a lot RabbitMQ : Create Dynamic queues in Direct Exchange: I could create a queue per session (in this case, the next question would be how many queues rabbitmq could manage at the same time?)

UPDATE after Gary's answer

Thanks for the reply, I tried the following but the application is very very long to start consumers:

@Bean
public QueueMessageListener listener() {
    return new QueueMessageListener();
}


@Bean(name="exchange")
public Exchange exchange() {
    TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    MessageListenerAdapter adapter = new MessageListenerAdapter(listener(), "processOrder");
    container.setMessageListener(adapter);
    admin().declareExchange(exchange);
    createQueues(exchange, QUEUE, numberOfQueues, BINDING_KEY, container, null, true);
    container.start();  // very very very long 
    return exchange;
}

private void createQueues(Exchange exchange, String queuePrefix, int numberOfQueues, String bindingPrefix,
        SimpleMessageListenerContainer container, Map<String, Object> args) {
    int length = 1;
    if(numberOfQueues > 1) {
        length = (int)(Math.log10(numberOfQueues - 1) + 1);
    }
    for (int i = 0; i < numberOfQueues; i++) {
        Queue queue = new Queue(queuePrefix + String.format("%0" + length + "d", i), true, false, false, args);
        container.addQueues(queue);
        admin().declareQueue(queue);
        Binding binding = BindingBuilder.bind(queue).to(exchange).with(bindingPrefix + i).noargs();
        admin().declareBinding(binding);
    }
}

If I don't call the start function, the consumers are not created.


Solution

  • You can spin up SimpleMessageListenerContainers programmatically rather than using the declarative paradigm.

    You can also use the RabbitAdmin to programmatically declare queues, bindings etc.

    See Configuring the Broker.

    Since Spring AMQP caches channels there is no guarantee that two sends will occur on the same channel (which introduces a very small possibility that order is lost); to ensure order, you need to use the new RabbitTemplate.invoke() method in the upcoming 2.0 release. It will perform sends within the scope of the invoke on the same channel so order is guaranteed.

    It is not a problem if your sending code is single-threaded, since the same channel will always be used in that case.