Search code examples

How do I trigger retries when using Spring AMQP @RabbitListener in reactive Spring-Webflux app

I have a spring-webflux app which must consume messages from rabbitMQ. In previous apps when NOT using spring-webflux I've been able to:

  1. Configure a retry policy when declaring the queue
  2. Setup a rabbit listener using the @RabbitListener annotation
  3. Trigger a retry by throwing an exception in the handler function

In spring-webflux I'm not able to throw an error, I just have a MonoError, how do I trigger a retry?

My code looks like something like this currently

public class vehicleUpdateListener {

  private final VehicleService service;
  private final OperationFactory operationFactory;

  @RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
  void handleVehicleUpdated(Message message) {
      .flatMap(service::handleOperation) // want to retry if downstream app is down


I have now worked out that it is possible. If client code for example returns a Mono<Exception> then this will trigger retries. Likewise I could conditionally trigger retries my mapping to a Mono<Exception>. For example if I want to trigger a retry when a product from a message does not exist, I could do the following

        .filter(exists -> !exists)
        .flatMap(missing -> Mono.error(new Exception("my exception")))
        .then(...) // carry on if it does exist


  • Using reactor with a non-reactive listener container has many challenges.

    1. You must use MANUAL acks and ack/nack the delivery after the reactive flow completes.
    2. You must use reactor's retry mechanisms.

    Consider looking at the project instead of Spring AMQP. At some time in the future we hope to build reactive @RabbitListeners, but they are not there yet.