I'm quite new to Mono and Flux. I'm trying to join several downstream API responses. It's a traditional blocking application. I don't wish to collect a list of Mono, I want a List of the payloads returned from the downstream APIs, which I fetch from the Mono. However the 'result' being returned to the controller at times only has some or none of the downstream API responses. What is the correct way to do this? I've read several posts How to iterate Flux and mix with Mono states
you should not call subscribe anywhere in a web application. If this is bound to an HTTP request, you're basically triggering the reactive pipeline with no guarantee about resources or completion. Calling subscribe triggers the pipeline but does not wait until it's complete
Should I be using CompletableFuture?
In my Service I attempted
var result = new ArrayList<List<>>();
List<Mono<X>> monoList = apiCall();
Flux.fromIterable(monoList)
.flatMap(m -> m.doOnSuccess(
x -> {
result.add(x.getData());
}
)).subscribe();
I also attempted the following in controller, but the method returns without waiting for subscribe to complete
var result = new ArrayList<List<X>>();
Flux.concat(
this.service.callApis(result, ...)
).subscribe();
return result;
In my service
public Mono<Void> callApis(List<List<x>> result, ..) {
...
return Flux.fromIterable(monoList)
.flatMap(m -> m.doOnSuccess(
x -> {
result.add(x.getData()...);
}
)).then();
The Project Reactor documentation (which is very good) has a section called Which operator do I need?. You need to create a Flux from your API calls, combine the results, and then return to the synchronous world.
In your case, it looks like all your downstream services have the same API, so they all return the same type and it doesn't really matter what order those responses appear in your application. Also, I'm assuming that apiCall()
returns a List<Mono<Response>>
. You probably want something like
Flux.fromIterable(apiCall()) // Flux<Mono<Response>>
.flatMap(mono -> mono) // Flux<Response>
.map(response -> response.getData()) // Flux<List<X>>
.collectList() // Mono<List<List<X>>>
.block(); // List<List<X>>
The fromIterable(...).flatMap(x->x)
construct just converts your List<Mono<R>>
into a Flux<R>
.
map()
is used to extract the data part of your response.
collectList()
creates a Mono
that waits until the Flux
completes, and gives a single result containing all the data lists.
block()
subscribes to the Mono
returned by the previous operator, and blocks until it is complete, which will (in this case) be when all the Mono
s returned by apiCall()
have completed.
There are many possible alternatives here, and which is most suitable will depend on your exact use case.