I'm using Spring AMQP 2.1.6 for consuming messages with a RabbitListener
returning a Mono<Void>
. For example:
@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
Mono<Void> mono = myService.doSomething(myMessage);
return mono;
}
Reading the documentation it says:
The listener container factory must be configured with AcknowledgeMode.MANUAL so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack the message when the async operation completes.
I've thus configured the container factory with AcknowledgeMode.MANUAL
, but it's not clear to me whether "the asynchronous completion will ack or nack the message when the async operation completes" means that this is handled by spring-amqp
itself or if this is something that I have to do? I.e. do I have to the ack/nack the message after the call to myService.doSomething(myMessage)
or does Spring AMQP automatically ack's it since I'm returning a Mono
(even though AcknowledgeMode.MANUAL
is set)?
If it's the case that I need to manually send ack's or reject's, what's the idiomatic way to do this in a non-blocking manner when using the RabbitListener
?
The listener adapter takes care of the ack when the Mono is completed.
See AbstractAdaptableMessageListener.asyncSuccess()
and asyncFailure()
.
EDIT
I am not a reactor person but, as far as I can tell, completing a Mono<Void>
does nothing so the on...()
methods are never called.
You can ack the delivery manually using channel.basicAck
or basicReject
...
@RabbitListener(queues = "foo")
public void listen(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
}