Search code examples
springspring-bootspring-amqpproject-reactorspring-rabbit

How to Ack/Nack with reactive RabbitListener in Spring AMQP?


I'm using Spring AMQP 2.1.6 for consuming messages with a RabbitListener returning a Mono<Void>. For example:

@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
    Mono<Void> mono = myService.doSomething(myMessage);
    return mono;
}

Reading the documentation it says:

The listener container factory must be configured with AcknowledgeMode.MANUAL so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack the message when the async operation completes.

I've thus configured the container factory with AcknowledgeMode.MANUAL, but it's not clear to me whether "the asynchronous completion will ack or nack the message when the async operation completes" means that this is handled by spring-amqp itself or if this is something that I have to do? I.e. do I have to the ack/nack the message after the call to myService.doSomething(myMessage) or does Spring AMQP automatically ack's it since I'm returning a Mono (even though AcknowledgeMode.MANUAL is set)?

If it's the case that I need to manually send ack's or reject's, what's the idiomatic way to do this in a non-blocking manner when using the RabbitListener?


Solution

  • The listener adapter takes care of the ack when the Mono is completed.

    See AbstractAdaptableMessageListener.asyncSuccess() and asyncFailure().

    EDIT

    I am not a reactor person but, as far as I can tell, completing a Mono<Void> does nothing so the on...() methods are never called.

    You can ack the delivery manually using channel.basicAck or basicReject...

    @RabbitListener(queues = "foo")
    public void listen(String in, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        ...
    }