Search code examples
spring-bootspring-cloud-streamspring-rabbit

Spring Cloud Stream topic per message for different consumers (in one Comsumer app)


This question is similar to Spring Cloud Stream topic per message for different consumers but the difference is that I want multiple Sinks in one consumer springboot application and I want to do this by rabbitmq topic(which is by default in spring cloud stream). I am not able to figure out correct configuration or somethign wrong in code. I have 3 sinks/cosumers. consumer1 is default and every message goes there.

**Updated as suggested by Garry **

Comment: my Producer App has routing key='*.events' application.yml

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-exchange
      rabbit:
        bindings:
          output:
            producer:
              routing-key-expression: headers['*.events']
  application:
    name: publisher-service
server:
  port: 15010

Producer code snippet Comment:message is sent with routing key ="test.events" . I sm not sure of 2nd argument but i am assuming it is bindingrouting-key =test1.events.billing which means I want it to be delivered to billing consumer besides default consumer.

 source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
                    .setHeader("*.events", "test1.events.billing")
                    .build());

Consumer configuration Comment: I want 3 queues assigned to exchange ="myexchange" . I am not sure if config is right. application.yml

spring:
  cloud:
      stream:
        bindings:
          defaultconsumer:
            destination: my-exchange
            group: queue1
          billingconsumer:
            destination: my-exchange
            group: queue2
          messageconsumer:
            destination: my-exchange
            group: queue3

        rabbit:
          bindings:
            defaultconsumer:
              consumer:
                bindingRoutingKey: '*.events.#'
            billingconsumer:
              consumer:
                bindingRoutingKey: test1.events.billing
            messageconsumer:
              consumer:
                bindingRoutingKey: test2.events.messages

  application:
    name: subscriber-service
server:
  port: 15020

Consumer code: IEventConsumer.java Comment: I am not sure the code below is right

public interface IEventConsumer {
     String INPUT = "my-exchange";

    @Input
    SubscribableChannel defaultconsumer();

    @Input
    SubscribableChannel billingconsumer();

    @Input
    SubscribableChannel messageconsumer();
}

EventConsumer.java Comment: All Iwant from below is the message should not be received my messsageConsumer! But in reality it goes thru all these methods.



    @StreamListener("defaultconsumer")
    public void subscribe1(EventMessage eventMessage) {
        logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }


   @StreamListener("billingconsumer")
    public void subscribe2(EventMessage eventMessage) {
        logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("messageconsumer")
    public void subscribe3(EventMessage eventMessage) {
        logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

Apparently something is wrong above and I dont see this working .Any ideas?


Solution

  •     @Input(INPUT)
        SubscribableChannel defaultconsumer();
    
        @Input(INPUT)
        SubscribableChannel billingconsumer();
    
        @Input(INPUT)
        SubscribableChannel messageconsumer();
    

    You are giving all three bindings the same name; just use @INPUT and the method name will be used as the binding name.

    And

    @StreamListener("defaultconsumer")
    

    etc.

    EDIT

    I just copied your code and it worked fine...

    @SpringBootApplication
    @EnableBinding({ IEventConsumer.class, Source.class })
    public class So60879187Application {
    
        private static final Logger logger = LoggerFactory.getLogger(So60879187Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So60879187Application.class, args);
        }
    
        @StreamListener("defaultconsumer")
        public void subscribe1(String eventMessage) {
            logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
        }
    
        @StreamListener("billingconsumer")
        public void subscribe2(String eventMessage) {
            logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
        }
    
        @StreamListener("messageconsumer")
        public void subscribe3(String eventMessage) {
            logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
        }
    
        @Bean
        public ApplicationRunner runner(MessageChannel output) {
            return args -> output.send(MessageBuilder.withPayload("foo")
                    .setHeader("*.events", "test1.events.billing")
                    .build());
        }
    
    }
    
    interface IEventConsumer {
        String INPUT = "my-exchange";
    
        @Input
        SubscribableChannel defaultconsumer();
    
        @Input
        SubscribableChannel billingconsumer();
    
        @Input
        SubscribableChannel messageconsumer();
    
    }
    
    spring:
      cloud:
          stream:
            bindings:
              defaultconsumer:
                destination: my-exchange
                group: queue1
              billingconsumer:
                destination: my-exchange
                group: queue2
              messageconsumer:
                destination: my-exchange
                group: queue3
              output:
                destination: my-exchange
    
            rabbit:
              bindings:
                defaultconsumer:
                  consumer:
                    bindingRoutingKey: '*.events.#'
                billingconsumer:
                  consumer:
                    bindingRoutingKey: test1.events.billing
                messageconsumer:
                  consumer:
                    bindingRoutingKey: test2.events.messages
                output:
                  producer:
                    routing-key-expression: headers['*.events']
    
      application:
        name: subscriber-service
    server:
      port: 15020
    

    and

    2020-03-27 09:45:33.607  INFO 30366 --- [change.queue1-1] com.example.demo.So60879187Application   
      :  DefaultEventConsumer received new event [foo] 
    2020-03-27 09:45:33.607  INFO 30366 --- [change.queue2-1] com.example.demo.So60879187Application   
      :  billingEventConsumer received new event [foo] 
    

    enter image description here

    EDIT2

    Newer functional programming model equivalent...

    @SpringBootApplication
    public class So608791871Application {
    
        private static final Logger logger = LoggerFactory.getLogger(So608791871Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So608791871Application.class, args);
        }
    
        @Bean
        public Consumer<String> defaultconsumer() {
            return eventMessage ->
                    logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
        }
    
        @Bean
        public Consumer<String> billingconsumer() {
            return eventMessage ->
                    logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
        }
    
        @Bean
        public Consumer<String> messageconsumer() {
            return eventMessage ->
                    logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
        }
    
        private final DirectProcessor<Message<?>> output = DirectProcessor.create();
    
        @Bean
        public Supplier<Flux<Message<?>>> output() {
            return () -> this.output;
        }
    
        @Bean
        public ApplicationRunner runner() {
            Message<String> msg1 = MessageBuilder.withPayload("foo")
                    .setHeader("*.events", "test1.events.billing")
                    .build();
            Message<String> msg2 = MessageBuilder.withPayload("bar")
                    .setHeader("*.events", "test2.events.messages")
                    .build();
            return args -> {
                this.output.onNext(msg1);
                this.output.onNext(msg2);
            };
        }
    
    }
    
    spring:
      cloud:
        function:
          definition: defaultconsumer;billingconsumer;messageconsumer;output
        stream:
          bindings:
            defaultconsumer-in-0:
              destination: my-exchange
              group: queue1
            billingconsumer-in-0:
              destination: my-exchange
              group: queue2
            messageconsumer-in-0:
              destination: my-exchange
              group: queue3
            output-out-0:
              destination: my-exchange
    
          rabbit:
            bindings:
              defaultconsumer-in-0:
                consumer:
                  bindingRoutingKey: '*.events.#'
              billingconsumer-in-0:
                consumer:
                  bindingRoutingKey: test1.events.billing
              messageconsumer-in-0:
                consumer:
                  bindingRoutingKey: test2.events.messages
              output-out-0:
                producer:
                  routing-key-expression: headers['*.events']
    
      application:
        name: subscriber-service
    server:
      port: 15020
    

    and

    2020-03-27 14:28:37.426  INFO 3646 --- [change.queue3-1] com.example.demo.So608791871Application
      :  messageEventConsumer received new event [bar] 
    2020-03-27 14:28:37.426  INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
      :  DefaultEventConsumer received new event [foo] 
    2020-03-27 14:28:37.426  INFO 3646 --- [change.queue2-1] com.example.demo.So608791871Application
      :  billingEventConsumer received new event [foo] 
    2020-03-27 14:28:37.429  INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
      :  DefaultEventConsumer received new event [bar]