I am new to reactive world, might sound a newbee, I have a flux of product having size 20-30, and for each product i need to fetch the below from different microservices:
What i have tried..
1. doOnNext
Flux<Product> products = ...
products
.doOnNext(product -> updateReviewCount)
.doOnNext(product -> updateTotalCommentCount)
.doOnNext(product -> updateWishlistedCount)
.doOnNext(product -> updateVariants)
...
This turns out to block the chain for each call for each product..
e.g.
Total records(20) * No. of service calls(5) * Time per service calls(30 ms) = 3000ms
But time will grow with the number of records || number of service calls.
2. map using map i updated and returned same reference, but the results were same.
3. collected all as list and executed aggregate query to downstream services
Flux<Product> products = ...
products
.collectList() // Mono<List<Product>>
.doOnNext(productList -> updateReviewCountOfAllInList)
.doOnNext(productList -> updateFieldB_ForAllInList)
.doOnNext(productList -> updateFieldC_ForAllInList)
.doOnNext(productList -> updateFieldD_ForAllInList)
...
This did increase the performance, although now the downstream application has to return more data for a query, so little time increased on downstream side but that's okay.
Now with this, i was able to achieve time as below... Total records(combined as list , so 1) * No. of service calls(5) * Time per service calls(50 ms as time increased) = 250ms
But time will grow with the number of service calls.
Now i need to parallelize these service calls and execute these service calls in parallel and update their respective fields on the same product instance (same reference). Some like below
Flux<Product> products = ... // of 10 products
products
.collectList() // Mono<List<Product>>
.doAllInParallel(serviceCall1, serviceCall2, serviceCall3...)
. // get all updated products // flux size of 10
With that i want to achieve time... 250/5 = 50ms
How to achieve that? I found different articles, but i am not sure on what's the best way to do it? Can someone please help me on the same.
it worked using
products // Flux<Product>
.collectList() // Mono<List<Product>>
.flatMap(list -> Mono.zip( this.call1(list) ,this.call2(list) ) ) // will return Tuple
.map(t -> t.getT1)
.flatMapIterable(i->i)
Mono<Product> call1(List<Product> productList){
// some logic
}
Mono<Product> call2(List<Product> productList){
// some logic
}
Actually zip and flatmapiterable , all could be done in a single step as well.. here's its just for demo.