Search code examples
javareactive-programmingspring-webfluxproject-reactor

What is the proper way to wait till all Mono responses are returned from downstream APIs


I'm quite new to Mono and Flux. I'm trying to join several downstream API responses. It's a traditional blocking application. I don't wish to collect a list of Mono, I want a List of the payloads returned from the downstream APIs, which I fetch from the Mono. However the 'result' being returned to the controller at times only has some or none of the downstream API responses. What is the correct way to do this? I've read several posts How to iterate Flux and mix with Mono states

you should not call subscribe anywhere in a web application. If this is bound to an HTTP request, you're basically triggering the reactive pipeline with no guarantee about resources or completion. Calling subscribe triggers the pipeline but does not wait until it's complete

Should I be using CompletableFuture?

In my Service I attempted

var result = new ArrayList<List<>>();
List<Mono<X>> monoList = apiCall();
Flux.fromIterable(monoList)
            .flatMap(m -> m.doOnSuccess(
                        x -> {  
                            result.add(x.getData());
                        }
        )).subscribe();

I also attempted the following in controller, but the method returns without waiting for subscribe to complete

var result = new ArrayList<List<X>>();
        Flux.concat(
                this.service.callApis(result, ...)
        ).subscribe();
        return result;

In my service

public Mono<Void> callApis(List<List<x>> result, ..) {
...
return Flux.fromIterable(monoList)
                .flatMap(m -> m.doOnSuccess(
                        x -> {  
                            result.add(x.getData()...);
                        }
                )).then();

Solution

  • The Project Reactor documentation (which is very good) has a section called Which operator do I need?. You need to create a Flux from your API calls, combine the results, and then return to the synchronous world.

    In your case, it looks like all your downstream services have the same API, so they all return the same type and it doesn't really matter what order those responses appear in your application. Also, I'm assuming that apiCall() returns a List<Mono<Response>>. You probably want something like

    Flux.fromIterable(apiCall()) // Flux<Mono<Response>>
        .flatMap(mono -> mono) // Flux<Response>
        .map(response -> response.getData()) // Flux<List<X>>
        .collectList() // Mono<List<List<X>>>
        .block(); // List<List<X>>
    

    The fromIterable(...).flatMap(x->x) construct just converts your List<Mono<R>> into a Flux<R>.

    map() is used to extract the data part of your response.

    collectList() creates a Mono that waits until the Flux completes, and gives a single result containing all the data lists.

    block() subscribes to the Mono returned by the previous operator, and blocks until it is complete, which will (in this case) be when all the Monos returned by apiCall() have completed.

    There are many possible alternatives here, and which is most suitable will depend on your exact use case.