Search code examples
javamonofluxproject-reactorreactor

Mono in Flux by single thread


I have Flux<Foo> from db (for example 5 elements). I need to get some info from each Foo, set all of it to Mono<MyRequest>, send to another rest resource, get a Mono<MyResponse> and use all info from it in each Foo.

I did it in Flux.flatMap() with a lot Mono.zipWith() and Mono.zipWhen(), but creating MyRequest and sending to resource occur 5 times by 5 threads.

Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono = 
                      monoFilters.flatMap(filter -> monoRequest.map(...))
                                 .zipWhen(request -> api.send(request))
                                 .flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo -> 
                       monoResponseFilters.zipWith(mono).map(...))

How can i process my Mono functions only once by 1 Thread inside Flux?


Solution

  • Let's assume that what this task is reads like this:

    • Get some values from database
    • When all values arrive, wrap them in request and send out
    • Zip result with response

    Then this leads us to something like this:

    Flux<Foo> foos = dao.getAll();
    Mono<List<Foo>> everything = foos.collectList();
    
    Mono<MyRequest> request = everything
        // collect the data into another Mono, then into request
        .map(list -> list.stream().map(Foo::getData).collect(toList()))
        .map(data -> new MyRequest(data));
    
    return request.zipWhen(request -> api.send(request));
    

    Alternatively, you can collect build request a little easier if you map the initial foos:

    Flux<Data> data = dao.getAll().map(Foo::getData);
    Mono<MyRequest> request = data.collectList().map(MyRequest::new);