I have Flux<Foo>
from db (for example 5 elements).
I need to get some info from each Foo, set all of it to Mono<MyRequest>
, send to another rest resource, get a Mono<MyResponse>
and use all info from it in each Foo.
I did it in Flux.flatMap()
with a lot Mono.zipWith()
and Mono.zipWhen()
, but creating MyRequest and sending to resource occur 5 times by 5 threads.
Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono =
monoFilters.flatMap(filter -> monoRequest.map(...))
.zipWhen(request -> api.send(request))
.flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo ->
monoResponseFilters.zipWith(mono).map(...))
How can i process my Mono functions only once by 1 Thread inside Flux?
Let's assume that what this task is reads like this:
Then this leads us to something like this:
Flux<Foo> foos = dao.getAll();
Mono<List<Foo>> everything = foos.collectList();
Mono<MyRequest> request = everything
// collect the data into another Mono, then into request
.map(list -> list.stream().map(Foo::getData).collect(toList()))
.map(data -> new MyRequest(data));
return request.zipWhen(request -> api.send(request));
Alternatively, you can collect build request a little easier if you map the initial foos
:
Flux<Data> data = dao.getAll().map(Foo::getData);
Mono<MyRequest> request = data.collectList().map(MyRequest::new);