I'm trying to dive into Spring Cloud Stream for RabbitMQ, but I'm missing something...
Currently I have two services :
spring:
cloud:
stream:
default-binder: rabbit
bindings:
new-price-out:
destination: new-price
content-type: application/json
rabbit:
binder:
host: localhost
port: 5672
username: guest
password: guest
@Component
public class RabbitMqAdapter implements MessageQueuePort {
private final StreamBridge streamBridge;
private final String CREATE_PRICE_BINDING = "new-price-out";
public RabbitMqAdapter(StreamBridge streamBridge){
this.streamBridge = streamBridge;
}
@Override
public void sendPriceCreationMessage(QuickPriceAnalysisRequest request) {
streamBridge.send(CREATE_PRICE_BINDING, request);
}
}
And serviceB which is consumer :
spring:
cloud:
stream:
default-binder: rabbit
bindings:
new-price-in:
destination: new-price # Same exchange name as the producer
content-type: application/json
group: price-consumers
rabbit:
bindings:
consumer:
declareExchange: false
binder:
host: localhost
port: 5672
username: guest
password: guest
@Component
@RequiredArgsConstructor
public class RabbitInAdapter implements PriceCreationEventPort{
private final NewPriceUseCase newPriceUseCase;
@Bean
@Override
public Consumer<Message<CreatePriceCommand>> newPrice() {
return message -> {
CreatePriceCommand payload = message.getPayload();
};
}
}
The issue is that I still get two separate exchanges that I need to bind manually. As "new-price" exchange has no bindings.
new-price exchange newPrice-in-0 manual setup that works
Can You provide me some clue how can I fix this ?
Your binding name is wrong on the consumer side.
you have a Consumer
bean name as a newPrice
, but your binding is new-price-in
. It has to be like newPrice-in-0
. See more info in docs: https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/functional-binding-names.html.
UPDATE
Just wrote a simple application:
@SpringBootApplication
public class So79382434Application {
public static void main(String[] args) {
SpringApplication.run(So79382434Application.class, args);
}
@Bean
ApplicationRunner init(StreamBridge streamBridge) {
return args -> streamBridge.send("new-price-out", "100.00");
}
@Bean
Consumer<String> newPrice() {
return data -> System.out.println("New price is: " + data);
}
}
With the application.yml
like:
spring:
application:
name: so-79382434
cloud:
stream:
bindings:
new-price-out:
destination: new-price
newPrice-in-0:
destination: new-price
And it works well, and have only one new-price
topic exchange on RabbitMQ broker.