Search code examples
javaspringspring-bootrabbitmqspring-amqp

RabbitMQ Single Active Consumer losing message order on consumer shutdown


Sample with Replicate steps found here.

Problem Definition:

We have a typical publisher - consumer setup where we require consumption of messages to be done sequentially and in order. Our goal is to be able to deploy multiple instances of the consuming microservice while maintaining the aforementioned sequentiality and order. Using the recommended Single Active Consumers (SAC) works well in most scenarios except during application shutdown.

Note: We are also using quorum queues and consumer acknowledgements.

Given instances A and B, on application shutdown of the active instance A, instance B becomes active and starts consuming messages while instance A is still processing its current message (Unacked).

Using pre-fetch 1 as an example
Before shutting down instance A: enter image description here

During instance A shutdown enter image description here

While the consumer for instance A has been removed from the queue, it's channel is still open and RabbitMQ still sees the unacked messages for both instances. Effectively we have parallel processing of different messages. Both messages are processed successfully but order is lost. This problem is further exacerbated with higher pre-fetch.

Versions and Code

Queue Definition:

@Bean(name = "testSingleActiveConsumerQueue")
Queue testSingleActiveConsumerQueue() {
    final Map<String, Object> args = new HashMap<>();
    args.put("x-queue-type", "quorum");
    args.put("x-single-active-consumer", true); // ENABLE SINGLE ACTIVE CONSUMER
    return new Queue(APP_IDENTIFIER + ".test.single_active_consumer", true, false, false, args);
}

I'm using the current latest docker image for rabbitmq (3.12.0)

services:
  rabbitmq:
    image: rabbitmq:3.12.0-management
    ports:
      - 5672:5672
      - 15672:15672
    restart: always
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest

And Spring integration AMQP 5.5.15

  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
    <version>5.5.15</version>
  </dependency>

Solution

  • This has been identified as a bug and fixed. Fix is available in spring-amqp versions 3.0.6 and 2.4.14 .

    For anyone else having the same issue, you need to update to the above spring-amqp versions, and enable the forceStop flag.

    From documentation:

    By default, stopping a container will cancel the consumer and process all prefetched messages before stopping. Starting with versions 2.4.14, 3.0.6, you can set the [forceStop] container property to true to stop immediately after the current message is processed, causing any prefetched messages to be requeued. This is useful, for example, if exclusive or single-active consumers are being used.

    Sample configuration:

    final SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
    listenerContainer.setForceStop(true);