Search code examples
spring-cloud-stream

Bind a consumer queue name to an existing exchange


I'm new to spring cloud and integration, and I'm struggling to do what I would think ought to be pretty straightforward. I was able to do it by declaring the bindings in annoatations, however I need to do it with yml configuration as we're going to be specifying the sources and destinations of the microservice at runtime.

In short, we have a predefined rabbit exchange, which I want to bind to with @Consumer function definition, and then pass the pass to an IntegrationFlow bean. The main issue I'm having at the moment is that it doesn't seem like my properties are being loaded correctly:

application.yml

spring: 
  rabbitmq:
    username: guest
    password: guest
    addresses: localhost:5672
  cloud:
    stream:
      function:
        bindings:
          consumeMyQueue-in-0: input 
          consumeMyQueue-out-0: output
      default-binder: rabbit
      bindings:
        input:
          destination: predefined-exchange-name
          group: my-queue-name
      rabbit:
        bindings:
          input:
            consumer:
              exchange-durable: true
              declareExchange: false
              exchangeType: headers
              queue-binding-arguments:
                x-match: all
                some-header: some-value 

Configuration:

@Bean
public Consumer<String> consumeMyQueue() {
    return System.out::println;
}

@Bean
public IntegrationFlow myFlow( ) {
    return IntegrationFlow.from("??consumeMyQueue-out-0/output??")
        .handle(System.out::println)
        .get();
}

Result: 'declaring queue for inbound: consumeMyQueue-in-0.anonymous.SjhNhVCCRHS5umpCPw5mog, bound to: consumeMyQueue-in-0' (on consumeMyQueue-in-0 topic exchange)

Expected: ''declaring queue for inbound: my-queue-name, bound to: consumeMyQueue-in-0' (in predefined-exchange-name headers exchange)


Solution

  • See the binder documentation:

    https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#_using_existing_queuesexchanges

    4. Using Existing Queues/Exchanges

    By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property . The destination defaults to the binding name, if not provided. When binding a consumer, a queue will automatically be provisioned with the name . (if a group binding property is specified), or an anonymous, auto-delete queue when there is no group. The queue will be bound to the exchange with the "match-all" wildcard routing key (#) for a non-partitioned binding or - for a partitioned binding. The prefix is an empty String by default. If an output binding is specified with requiredGroups, a queue/binding will be provisioned for each group.

    There are a number of rabbit-specific binding properties that allow you to modify this default behavior.

    If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named myExchange and the queue is named myQueue:

    spring.cloud.stream.bindings.<binding name>.destination=myExchange
    
    spring.cloud.stream.bindings.<binding name>.group=myQueue
    
    spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false
    
    spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false
    
    spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true