Search code examples
javaspringspring-webfluxspring-amqpreactive

How do I trigger retries when using Spring AMQP @RabbitListener in reactive Spring-Webflux app


I have a spring-webflux app which must consume messages from rabbitMQ. In previous apps when NOT using spring-webflux I've been able to:

  1. Configure a retry policy when declaring the queue
  2. Setup a rabbit listener using the @RabbitListener annotation
  3. Trigger a retry by throwing an exception in the handler function

In spring-webflux I'm not able to throw an error, I just have a MonoError, how do I trigger a retry?

My code looks like something like this currently

@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {

  private final VehicleService service;
  private final OperationFactory operationFactory;

  @RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
  void handleVehicleUpdated(Message message) {
    Mono.just(message)
      .map(operationFactory::generateOperationFromMessage)
      .flatMap(service::handleOperation) // want to retry if downstream app is down
      .subscribe();
  }
}

EDIT

I have now worked out that it is possible. If client code for example returns a Mono<Exception> then this will trigger retries. Likewise I could conditionally trigger retries my mapping to a Mono<Exception>. For example if I want to trigger a retry when a product from a message does not exist, I could do the following

repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> !exists)
        .flatMap(missing -> Mono.error(new Exception("my exception")))
        .then(...) // carry on if it does exist

Solution

  • Using reactor with a non-reactive listener container has many challenges.

    1. You must use MANUAL acks and ack/nack the delivery after the reactive flow completes.
    2. You must use reactor's retry mechanisms.

    Consider looking at the https://github.com/reactor/reactor-rabbitmq project instead of Spring AMQP. At some time in the future we hope to build reactive @RabbitListeners, but they are not there yet.