I noticed difference in reactive and non-reactive behavior of Spring Cloud Stream. When I use non-reactive approach messages are read one by one. Here is code snipped for stream listener:
@StreamListener(Processor.INPUT)
public void receive2(String input) throws InterruptedException {
Thread.sleep(2000);
System.out.println(input.toUpperCase());
}
This code consumes messages one by one. I can see this from RabbitMQ Management site:
After the restart application continues to consume messages from the point where it finished before restart. But using reactive stream listener service fetches all messages from the queue immediately. After the restart of the application it does not continue to process messages because the queue is empty.
And here is reactive stream listener snipped:
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<String> input) {
input
.delayElements(Duration.ofSeconds(2))
.map(String::toUpperCase)
.doOnEach(System.out::println)
.subscribe();
}
I want to understand why it happens and is it possible to stop reading the whole queue before processing previous elements. Is it my wrong understanding of Reactor operators or it is expected behavior? Is it possible to somehow specify backpressure?
Since reactive is non-blocking; the delayElements()
hands off the work to another thread; this frees up the listener thread which returns to the container and acks the message; hence the next one is delivered (and delayed); so, all the messages end up in the scheduler's queue. Reactive is really not suitable for this use case, unless you use manual acks, which is shown below.
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<Message<String>> input) {
input
.doOnEach(System.out::println)
.delayElements(Duration.ofSeconds(2))
.map(m -> {
return MessageBuilder.withPayload(m.getPayload().toUpperCase())
.copyHeaders(m.getHeaders())
.build();
})
.doOnEach(System.out::println)
.doOnNext(m -> {
try {
m.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
.basicAck(m.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
}
catch (IOException e) {
e.printStackTrace();
}
})
.subscribe();
}
and
spring.cloud.stream.bindings.input.group=foo
spring.cloud.stream.rabbit.bindings.input.consumer.acknowledge-mode=manual