Search code examples
springrabbitmqspring-rabbitspring-cloud-stream-binder

Spring RabbitMQ binder: does partition producer use multiple channels to publish messages routed to one of queue partitions?


We're using Spring RabbitMQ Binder to partition queue. We're consuming queue and then based on the our implementation of PartitionKeyExtractorStrategy we send messages to queue partitions. It is important for us that messages which get into queue partitions preserved their order but they are not for some reason. We see from logs of PartitionKeyExtractorStrategy implementation that messages consumed from main queue are in the right order. Could it be that partition producer sends messages to queue partitions async or using multiple channels so that order is broke from time to time?

This is our application.yml config:

spring:
  cloud:
    stream:
      bindings:
        mainQueue:
          destination: TopicExchange
          group: MainQueue
          consumer:
            partitioned: false
            concurrency: 1
            maxAttempts: 1
        partitionProducer:
          destination: TopicExchange
          producer:
            partitionCount: ${REPLICAS}
            partitionKeyExtractorName: userIdKeyExtractor
...

rabbit:
  bindings:
    mainQueue:
      consumer:
        bindingRoutingKeyDelimiter: ","
        bindingRoutingKey: routingKey1, routingKey2
        declareExchange: true
        queueNameGroupOnly: true
        exclusive: true
        prefetch: 100
        batchSize: 100
        transacted: true
        autoBindDlq: false
        republishToDlq: false
        requeueRejected: true
    partitionProducer:
      producer:
        declareExchange: true
    partitionConsumer:
      consumer:
        declareExchange: true
        queueNameGroupOnly: true
        prefetch: 100
        txSize: 1
        transacted: true
        autoBindDlq: false
        republishToDlq: false
        requeueRejected: true
        enableBatching: true
        batchSize: 1
        receiveTimeout: 100
    queryConsumer:
      consumer:
        anonymousGroupPrefix: com.some.Query-
        bindingRoutingKeyDelimiter: ","
        bindingRoutingKey: Event1,Event2,Event3
        declareExchange: true
        queueNameGroupOnly: true
        prefetch: 1
        txSize: 1
        autoBindDlq: false
        republishToDlq: false
        requeueRejected: true
        durableSubscription: false
        expires: 600000

As you see above we've tried to make main queue consumer transactional but it didn't solve our issue.


Solution

  • By default, a CachingConnectionFactory is used; since channels are cached, there is no guarantee that the same channel is used when publishing on the same thread; in high volume, multi-threaded environments, this can cause out of order delivery.

    You can avoid this issue by defining a ThreadLocalChannelConnectionFactory @Bean instead; then Boot will no longer auto configure the CCF and your bean will be used instead.

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#choosing-factory