I have a spring-webflux app which must consume messages from rabbitMQ. In previous apps when NOT using spring-webflux I've been able to:
In spring-webflux I'm not able to throw an error, I just have a MonoError, how do I trigger a retry?
My code looks like something like this currently
@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {
private final VehicleService service;
private final OperationFactory operationFactory;
@RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
void handleVehicleUpdated(Message message) {
Mono.just(message)
.map(operationFactory::generateOperationFromMessage)
.flatMap(service::handleOperation) // want to retry if downstream app is down
.subscribe();
}
}
EDIT
I have now worked out that it is possible. If client code for example returns a Mono<Exception>
then this will trigger retries. Likewise I could conditionally trigger retries my mapping to a Mono<Exception>
. For example if I want to trigger a retry when a product from a message does not exist, I could do the following
repository.findByProductId(product.getProductId())
.hasElement()
.filter(exists -> !exists)
.flatMap(missing -> Mono.error(new Exception("my exception")))
.then(...) // carry on if it does exist
Using reactor with a non-reactive listener container has many challenges.
Consider looking at the https://github.com/reactor/reactor-rabbitmq project instead of Spring AMQP. At some time in the future we hope to build reactive @RabbitListener
s, but they are not there yet.