I'm new to spring cloud and integration, and I'm struggling to do what I would think ought to be pretty straightforward. I was able to do it by declaring the bindings in annoatations, however I need to do it with yml configuration as we're going to be specifying the sources and destinations of the microservice at runtime.
In short, we have a predefined rabbit exchange, which I want to bind to with @Consumer function definition, and then pass the pass to an IntegrationFlow bean. The main issue I'm having at the moment is that it doesn't seem like my properties are being loaded correctly:
application.yml
spring:
rabbitmq:
username: guest
password: guest
addresses: localhost:5672
cloud:
stream:
function:
bindings:
consumeMyQueue-in-0: input
consumeMyQueue-out-0: output
default-binder: rabbit
bindings:
input:
destination: predefined-exchange-name
group: my-queue-name
rabbit:
bindings:
input:
consumer:
exchange-durable: true
declareExchange: false
exchangeType: headers
queue-binding-arguments:
x-match: all
some-header: some-value
Configuration:
@Bean
public Consumer<String> consumeMyQueue() {
return System.out::println;
}
@Bean
public IntegrationFlow myFlow( ) {
return IntegrationFlow.from("??consumeMyQueue-out-0/output??")
.handle(System.out::println)
.get();
}
Result: 'declaring queue for inbound: consumeMyQueue-in-0.anonymous.SjhNhVCCRHS5umpCPw5mog, bound to: consumeMyQueue-in-0' (on consumeMyQueue-in-0 topic exchange)
Expected: ''declaring queue for inbound: my-queue-name, bound to: consumeMyQueue-in-0' (in predefined-exchange-name headers exchange)
See the binder documentation:
4. Using Existing Queues/Exchanges
By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property . The destination defaults to the binding name, if not provided. When binding a consumer, a queue will automatically be provisioned with the name . (if a group binding property is specified), or an anonymous, auto-delete queue when there is no group. The queue will be bound to the exchange with the "match-all" wildcard routing key (#) for a non-partitioned binding or - for a partitioned binding. The prefix is an empty String by default. If an output binding is specified with requiredGroups, a queue/binding will be provisioned for each group.
There are a number of rabbit-specific binding properties that allow you to modify this default behavior.
If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named myExchange and the queue is named myQueue:
spring.cloud.stream.bindings.<binding name>.destination=myExchange
spring.cloud.stream.bindings.<binding name>.group=myQueue
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true