Search code examples
javamonomicroservicesspring-webfluxflux

How to invoke multiple services for a flux / mono in java reactive?


I am new to reactive world, might sound a newbee, I have a flux of product having size 20-30, and for each product i need to fetch the below from different microservices:

  1. average review count
  2. totalCommentCount
  3. wishlistedCount
  4. variants..
  5. .. 6 ..

What i have tried..

1. doOnNext

Flux<Product> products = ...
products
.doOnNext(product -> updateReviewCount)
.doOnNext(product -> updateTotalCommentCount)
.doOnNext(product -> updateWishlistedCount)
.doOnNext(product -> updateVariants)
...

This turns out to block the chain for each call for each product..

e.g.
Total records(20) * No. of service calls(5) * Time per service calls(30 ms) = 3000ms 

But time will grow with the number of records || number of service calls.

2. map using map i updated and returned same reference, but the results were same.

3. collected all as list and executed aggregate query to downstream services

Flux<Product> products = ...
products
.collectList() // Mono<List<Product>>
.doOnNext(productList -> updateReviewCountOfAllInList)
.doOnNext(productList -> updateFieldB_ForAllInList)
.doOnNext(productList -> updateFieldC_ForAllInList)
.doOnNext(productList -> updateFieldD_ForAllInList)
...

This did increase the performance, although now the downstream application has to return more data for a query, so little time increased on downstream side but that's okay.

Now with this, i was able to achieve time as below... Total records(combined as list , so 1) * No. of service calls(5) * Time per service calls(50 ms as time increased) = 250ms

But time will grow with the number of service calls.

Now i need to parallelize these service calls and execute these service calls in parallel and update their respective fields on the same product instance (same reference). Some like below

Flux<Product> products = ... // of 10 products
products
.collectList() // Mono<List<Product>>
.doAllInParallel(serviceCall1, serviceCall2, serviceCall3...)

. // get all updated products // flux size of 10

With that i want to achieve time... 250/5 = 50ms

How to achieve that? I found different articles, but i am not sure on what's the best way to do it? Can someone please help me on the same.


Solution

  • it worked using

    products // Flux<Product>
    .collectList() // Mono<List<Product>>
    .flatMap(list -> Mono.zip( this.call1(list) ,this.call2(list) ) ) // will return Tuple
    .map(t -> t.getT1) 
    .flatMapIterable(i->i)
    
    Mono<Product> call1(List<Product> productList){
       // some logic
    }
    Mono<Product> call2(List<Product> productList){
       // some logic
    }
    

    Actually zip and flatmapiterable , all could be done in a single step as well.. here's its just for demo.