Search code examples
springrabbitmqspring-cloudspring-cloud-streamspring-rabbit

Reactive Spring Cloud Stream reads all data from queue, but non-reactive reads messages one by one


I noticed difference in reactive and non-reactive behavior of Spring Cloud Stream. When I use non-reactive approach messages are read one by one. Here is code snipped for stream listener:

@StreamListener(Processor.INPUT)
public void receive2(String input) throws InterruptedException {
    Thread.sleep(2000);
    System.out.println(input.toUpperCase());
}

This code consumes messages one by one. I can see this from RabbitMQ Management site:

enter image description here

After the restart application continues to consume messages from the point where it finished before restart. But using reactive stream listener service fetches all messages from the queue immediately. After the restart of the application it does not continue to process messages because the queue is empty.

And here is reactive stream listener snipped:

@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<String> input) {
    input
            .delayElements(Duration.ofSeconds(2))
            .map(String::toUpperCase)
            .doOnEach(System.out::println)
            .subscribe();
}

enter image description here

I want to understand why it happens and is it possible to stop reading the whole queue before processing previous elements. Is it my wrong understanding of Reactor operators or it is expected behavior? Is it possible to somehow specify backpressure?


Solution

  • Since reactive is non-blocking; the delayElements() hands off the work to another thread; this frees up the listener thread which returns to the container and acks the message; hence the next one is delivered (and delayed); so, all the messages end up in the scheduler's queue. Reactive is really not suitable for this use case, unless you use manual acks, which is shown below.

    @StreamListener
    public void receive1(@Input(Processor.INPUT) Flux<Message<String>> input) {
        input
                .doOnEach(System.out::println)
                .delayElements(Duration.ofSeconds(2))
                .map(m -> {
                    return MessageBuilder.withPayload(m.getPayload().toUpperCase())
                            .copyHeaders(m.getHeaders())
                            .build();
                })
                .doOnEach(System.out::println)
                .doOnNext(m -> {
                    try {
                        m.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                            .basicAck(m.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                })
                .subscribe();
    }
    

    and

    spring.cloud.stream.bindings.input.group=foo
    spring.cloud.stream.rabbit.bindings.input.consumer.acknowledge-mode=manual