We're using Spring RabbitMQ Binder to partition queue. We're consuming queue and then based on the our implementation of PartitionKeyExtractorStrategy
we send messages to queue partitions. It is important for us that messages which get into queue partitions preserved their order but they are not for some reason. We see from logs of PartitionKeyExtractorStrategy
implementation that messages consumed from main queue are in the right order. Could it be that partition producer sends messages to queue partitions async or using multiple channels so that order is broke from time to time?
This is our application.yml config:
spring:
cloud:
stream:
bindings:
mainQueue:
destination: TopicExchange
group: MainQueue
consumer:
partitioned: false
concurrency: 1
maxAttempts: 1
partitionProducer:
destination: TopicExchange
producer:
partitionCount: ${REPLICAS}
partitionKeyExtractorName: userIdKeyExtractor
...
rabbit:
bindings:
mainQueue:
consumer:
bindingRoutingKeyDelimiter: ","
bindingRoutingKey: routingKey1, routingKey2
declareExchange: true
queueNameGroupOnly: true
exclusive: true
prefetch: 100
batchSize: 100
transacted: true
autoBindDlq: false
republishToDlq: false
requeueRejected: true
partitionProducer:
producer:
declareExchange: true
partitionConsumer:
consumer:
declareExchange: true
queueNameGroupOnly: true
prefetch: 100
txSize: 1
transacted: true
autoBindDlq: false
republishToDlq: false
requeueRejected: true
enableBatching: true
batchSize: 1
receiveTimeout: 100
queryConsumer:
consumer:
anonymousGroupPrefix: com.some.Query-
bindingRoutingKeyDelimiter: ","
bindingRoutingKey: Event1,Event2,Event3
declareExchange: true
queueNameGroupOnly: true
prefetch: 1
txSize: 1
autoBindDlq: false
republishToDlq: false
requeueRejected: true
durableSubscription: false
expires: 600000
As you see above we've tried to make main queue consumer transactional but it didn't solve our issue.
By default, a CachingConnectionFactory
is used; since channels are cached, there is no guarantee that the same channel is used when publishing on the same thread; in high volume, multi-threaded environments, this can cause out of order delivery.
You can avoid this issue by defining a ThreadLocalChannelConnectionFactory
@Bean
instead; then Boot will no longer auto configure the CCF and your bean will be used instead.
https://docs.spring.io/spring-amqp/docs/current/reference/html/#choosing-factory