Search code examples
javaspring-cloud-streamspring-rabbit

How to use RMQ and spring cloud stream for creating partition based consumer


I am able to develop sample consumer using cloud stream and rabbit mq, if I have 3 partitions created by producer and if I deploy 3 instances in CF each one picks one queue and processes messages using index as documented.

Now question is if I have 10 partitions seems I need 10 instances, that is waste of resources, can we have one consumer listens to multiple partitions. The reason I have partition based producer because for me sequence of messages order for processing matters.


Solution

  • Here is one way...

    @SpringBootApplication
    @EnableBinding(TwoInputs.class)
    public class So43661064Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So43661064Application.class, args);
        }
    
        @StreamListener("input1")
        public void foo1(String in) {
            doFoo(in);
        }
    
        @StreamListener("input2")
        public void foo2(String in) {
            doFoo(in);
        }
    
        protected void doFoo(String in) {
            System.out.println(in);
        }
    
        public interface TwoInputs {
    
            @Input("input1")
            SubscribableChannel input1();
    
            @Input("input2")
            SubscribableChannel input2();
    
        }
    
    }
    

    and

    spring.cloud.stream.bindings.input1.group=bar-0
    spring.cloud.stream.bindings.input1.destination=foo
    spring.cloud.stream.rabbit.bindings.input1.consumer.bindingRoutingKey=foo-0
    
    spring.cloud.stream.bindings.input2.group=bar-1
    spring.cloud.stream.bindings.input2.destination=foo
    spring.cloud.stream.rabbit.bindings.input2.consumer.bindingRoutingKey=foo-1
    

    This will consume from the 2 partitions created by the producer in the answer to your other question.

    There's currently not a way to have a @StreamListener listen directly to 2 partitions.

    EDIT

    Here is another way, using exchange->exchange binding...

    Producer

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class So43614477Application implements CommandLineRunner {
    
        public static void main(String[] args) {
            SpringApplication.run(So43614477Application.class, args).close();
        }
    
        @Autowired
        private MessageChannel output;
    
        @Autowired
        private AmqpAdmin admin;
    
        @Value("${spring.cloud.stream.bindings.output.producer.partition-count}")
        private int partitionCount;
    
        @Value("${spring.cloud.stream.bindings.output.destination}")
        private String destination;
    
        @Override
        public void run(String... args) throws Exception {
            for (int i = 0; i < this.partitionCount; i++) {
                String partition = this.destination + "-" + i;
                TopicExchange exchange = new TopicExchange(partition);
                this.admin.declareExchange(exchange);
                Binding binding = BindingBuilder.bind(exchange).to(new TopicExchange(this.destination))
                        .with(partition);
                this.admin.declareBinding(binding);
            }
    
            output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
            output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
        }
    
    }
    

    and

    spring.cloud.stream.bindings.output.destination=foo
    spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
    spring.cloud.stream.bindings.output.producer.partition-count=2
    

    Consumer

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So43661064Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So43661064Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void foo1(String in) {
            System.out.println(in);
        }
    
    }
    

    and

    spring.cloud.stream.bindings.input.group=bar
    spring.cloud.stream.bindings.input.destination=foo-0,foo-1
    

    The partitions from the primary exchange are routed to the partition exchange and the consumer gets a list of exchanges to bind his queues to.

    You could pass that list in on the command line.