Search code examples
javaspring-cloud-streamrabbitmq-exchange

Spring Cloud Stream RabbitMQ connection between exchanges


I'm trying to dive into Spring Cloud Stream for RabbitMQ, but I'm missing something...

Currently I have two services :

  • ServiceA which is producer, with the following properties :

    spring:
        cloud:
            stream:
                default-binder: rabbit
                bindings:
                    new-price-out:
                        destination: new-price
                        content-type: application/json
                rabbit:
                    binder:
                        host: localhost
                        port: 5672
                        username: guest
                        password: guest


    @Component
    public class RabbitMqAdapter implements MessageQueuePort {
    
        private final StreamBridge streamBridge;
        private final String CREATE_PRICE_BINDING = "new-price-out";
    
        public RabbitMqAdapter(StreamBridge streamBridge){
            this.streamBridge = streamBridge;
        }
    
        @Override
        public void sendPriceCreationMessage(QuickPriceAnalysisRequest request) {
            streamBridge.send(CREATE_PRICE_BINDING, request);
        }
    }

And serviceB which is consumer :


    spring:
      cloud:
        stream:
          default-binder: rabbit
          bindings:
            new-price-in:
              destination: new-price # Same exchange name as the producer
              content-type: application/json
              group: price-consumers
          rabbit:
            bindings:
              consumer:
                declareExchange: false
            binder:
              host: localhost
              port: 5672
              username: guest
              password: guest


    @Component
    @RequiredArgsConstructor
    public class RabbitInAdapter implements PriceCreationEventPort{
    
        private final NewPriceUseCase newPriceUseCase;
    
        @Bean
        @Override
        public Consumer<Message<CreatePriceCommand>> newPrice() {
            return message -> {
                CreatePriceCommand payload = message.getPayload();
            };
        }
    }

Rabbit Manager

The issue is that I still get two separate exchanges that I need to bind manually. As "new-price" exchange has no bindings.

new-price exchange newPrice-in-0 manual setup that works

Can You provide me some clue how can I fix this ?


Solution

  • Your binding name is wrong on the consumer side.

    you have a Consumer bean name as a newPrice, but your binding is new-price-in. It has to be like newPrice-in-0. See more info in docs: https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/functional-binding-names.html.

    UPDATE

    Just wrote a simple application:

    @SpringBootApplication
    public class So79382434Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So79382434Application.class, args);
        }
    
        @Bean
        ApplicationRunner init(StreamBridge streamBridge) {
            return args -> streamBridge.send("new-price-out", "100.00");
        }
    
        @Bean
        Consumer<String> newPrice() {
            return data -> System.out.println("New price is: " + data);
        }
    
    }
    

    With the application.yml like:

    spring:
      application:
        name: so-79382434
      cloud:
        stream:
          bindings:
            new-price-out:
              destination: new-price
            newPrice-in-0:
              destination: new-price
    

    And it works well, and have only one new-price topic exchange on RabbitMQ broker.