I am able to develop sample consumer using cloud stream and rabbit mq, if I have 3 partitions created by producer and if I deploy 3 instances in CF each one picks one queue and processes messages using index as documented.
Now question is if I have 10 partitions seems I need 10 instances, that is waste of resources, can we have one consumer listens to multiple partitions. The reason I have partition based producer because for me sequence of messages order for processing matters.
Here is one way...
@SpringBootApplication
@EnableBinding(TwoInputs.class)
public class So43661064Application {
public static void main(String[] args) {
SpringApplication.run(So43661064Application.class, args);
}
@StreamListener("input1")
public void foo1(String in) {
doFoo(in);
}
@StreamListener("input2")
public void foo2(String in) {
doFoo(in);
}
protected void doFoo(String in) {
System.out.println(in);
}
public interface TwoInputs {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
}
}
and
spring.cloud.stream.bindings.input1.group=bar-0
spring.cloud.stream.bindings.input1.destination=foo
spring.cloud.stream.rabbit.bindings.input1.consumer.bindingRoutingKey=foo-0
spring.cloud.stream.bindings.input2.group=bar-1
spring.cloud.stream.bindings.input2.destination=foo
spring.cloud.stream.rabbit.bindings.input2.consumer.bindingRoutingKey=foo-1
This will consume from the 2 partitions created by the producer in the answer to your other question.
There's currently not a way to have a @StreamListener
listen directly to 2 partitions.
EDIT
Here is another way, using exchange->exchange
binding...
Producer
@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43614477Application.class, args).close();
}
@Autowired
private MessageChannel output;
@Autowired
private AmqpAdmin admin;
@Value("${spring.cloud.stream.bindings.output.producer.partition-count}")
private int partitionCount;
@Value("${spring.cloud.stream.bindings.output.destination}")
private String destination;
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < this.partitionCount; i++) {
String partition = this.destination + "-" + i;
TopicExchange exchange = new TopicExchange(partition);
this.admin.declareExchange(exchange);
Binding binding = BindingBuilder.bind(exchange).to(new TopicExchange(this.destination))
.with(partition);
this.admin.declareBinding(binding);
}
output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
}
}
and
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2
Consumer
@SpringBootApplication
@EnableBinding(Sink.class)
public class So43661064Application {
public static void main(String[] args) {
SpringApplication.run(So43661064Application.class, args);
}
@StreamListener(Sink.INPUT)
public void foo1(String in) {
System.out.println(in);
}
}
and
spring.cloud.stream.bindings.input.group=bar
spring.cloud.stream.bindings.input.destination=foo-0,foo-1
The partitions from the primary exchange are routed to the partition exchange and the consumer gets a list of exchanges to bind his queues to.
You could pass that list in on the command line.