Sample with Replicate steps found here.
Problem Definition:
We have a typical publisher - consumer setup where we require consumption of messages to be done sequentially and in order. Our goal is to be able to deploy multiple instances of the consuming microservice while maintaining the aforementioned sequentiality and order. Using the recommended Single Active Consumers (SAC) works well in most scenarios except during application shutdown.
Note: We are also using quorum queues and consumer acknowledgements.
Given instances A and B, on application shutdown of the active instance A, instance B becomes active and starts consuming messages while instance A is still processing its current message (Unacked).
Using pre-fetch 1 as an example
Before shutting down instance A:
While the consumer for instance A has been removed from the queue, it's channel is still open and RabbitMQ still sees the unacked messages for both instances. Effectively we have parallel processing of different messages. Both messages are processed successfully but order is lost. This problem is further exacerbated with higher pre-fetch.
Versions and Code
Queue Definition:
@Bean(name = "testSingleActiveConsumerQueue")
Queue testSingleActiveConsumerQueue() {
final Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
args.put("x-single-active-consumer", true); // ENABLE SINGLE ACTIVE CONSUMER
return new Queue(APP_IDENTIFIER + ".test.single_active_consumer", true, false, false, args);
}
I'm using the current latest docker image for rabbitmq (3.12.0)
services:
rabbitmq:
image: rabbitmq:3.12.0-management
ports:
- 5672:5672
- 15672:15672
restart: always
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
And Spring integration AMQP 5.5.15
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>5.5.15</version>
</dependency>
This has been identified as a bug and fixed. Fix is available in spring-amqp versions 3.0.6 and 2.4.14 .
For anyone else having the same issue, you need to update to the above spring-amqp versions, and enable the forceStop
flag.
From documentation:
By default, stopping a container will cancel the consumer and process all prefetched messages before stopping. Starting with versions 2.4.14, 3.0.6, you can set the [forceStop] container property to true to stop immediately after the current message is processed, causing any prefetched messages to be requeued. This is useful, for example, if exclusive or single-active consumers are being used.
Sample configuration:
final SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
listenerContainer.setForceStop(true);