I want to send n-number of requests to a REST endpoint in parallel.I want to make sure these get executed in different threads for performance and need to wait till all n requests finish.
Only way I could come up with is using CountDownLatch as follows (please check the main() method. This is testable code):
public static void main(String args[]) throws Exception {
int n = 10; //n is dynamic during runtime
final CountDownLatch waitForNRequests = new CountDownLatch(n);
//send n requests
for (int i =0;i<n;i++) {
var r = testRestCall(""+i);
r.publishOn(Schedulers.parallel()).subscribe(res -> {
System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
waitForNRequests.countDown();
});
}
waitForNRequests.await(); //wait till all n requests finish before goto the next line
System.out.println("All n requests finished");
Thread.sleep(10000);
}
public static Mono<ResponseEntity<Map>> testRestCall(String id) {
WebClient client = WebClient.create("https://reqres.in/api");
JSONObject request = new JSONObject();
request.put("name", "user"+ id);
request.put("job", "leader");
var res = client.post().uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(request.toString()))
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity(Map.class)
.onErrorReturn(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build());
return res;
}
This doesnt look good and I am sure there is an elegant solution without using Latches..etc I tried following method,but I dont know how to resolve following issues:
List<Mono<ResponseEntity<Map>>> lst = new ArrayList<>();
int n = 10; //n is dynamic during runtime
for (int i =0;i<n;i++) {
var r = testRestCall(""+i);
lst.add(r);
}
var t= Flux.fromIterable(lst).flatMap(Function.identity()); //tried merge() contact() as well
t.publishOn(Schedulers.parallel()).subscribe(res -> {
System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
///??? all requests execute in a single thread.How to parallelize ?
});
//???How to wait till all n requests finish before goto the next line
System.out.println("All n requests finished");
Thread.sleep(10000);
Update:
I found the reason why the Flux subscriber runs in the same thread, I need to create a ParallelFlux. So the correct order should be:
var t= Flux.fromIterable(lst).flatMap(Function.identity());
t.parallel()
.runOn(Schedulers.parallel())
.subscribe(res -> {
System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
///??? all requests execute in a single thread.How to parallelize ?
});
Ref: https://projectreactor.io/docs/core/release/reference/#advanced-parallelizing-parralelflux
In reactive you think not about threads but about concurrency.
Reactor executes non-blocking/async tasks on a small number of threads using Schedulers
abstraction to execute tasks. Schedulers
have responsibilities very similar to ExecutorService
. By default, for parallel scheduler number of threads is equal to number of CPU cores, but could be controlled by `reactor.schedulers.defaultPoolSize’ system property.
In your example instead of creating multiple Mono
and then merge them, better to use Flux
and then process elements in parallel controlling concurrency.
Flux.range(1, 10)
.flatMap(this::testRestCall)
By default, flatMap will process Queues.SMALL_BUFFER_SIZE = 256
number of in-flight inner sequences.
You could control concurrency flatMap(item -> process(item), concurrency)
or use concatMap
operator if you want to process sequentially. Check flatMap(..., int concurrency, int prefetch) for details.
Flux.range(1, 10)
.flatMap(i -> testRestCall(i), 5)
The following test shows that calls are executed in different threads
@Test
void testParallel() {
var flow = Flux.range(1, 10)
.flatMap(i -> testRestCall(i))
.log()
.then(Mono.just("complete"));
StepVerifier.create(flow)
.expectNext("complete")
.verifyComplete();
}
The result log
2022-12-30 21:31:25.169 INFO 43383 --- [ctor-http-nio-4] reactor.Mono.FlatMap.3 : | onComplete()
2022-12-30 21:31:25.170 INFO 43383 --- [ctor-http-nio-3] reactor.Mono.FlatMap.2 : | onComplete()
2022-12-30 21:31:25.169 INFO 43383 --- [ctor-http-nio-2] reactor.Mono.FlatMap.1 : | onComplete()
2022-12-30 21:31:25.169 INFO 43383 --- [ctor-http-nio-8] reactor.Mono.FlatMap.7 : | onComplete()
2022-12-30 21:31:25.169 INFO 43383 --- [tor-http-nio-11] reactor.Mono.FlatMap.10 : | onComplete()
2022-12-30 21:31:25.169 INFO 43383 --- [ctor-http-nio-7] reactor.Mono.FlatMap.6 : | onComplete()
2022-12-30 21:31:25.169 INFO 43383 --- [ctor-http-nio-9] reactor.Mono.FlatMap.8 : | onComplete()
2022-12-30 21:31:25.170 INFO 43383 --- [ctor-http-nio-6] reactor.Mono.FlatMap.5 : | onComplete()
2022-12-30 21:31:25.378 INFO 43383 --- [ctor-http-nio-5] reactor.Mono.FlatMap.4 : | onComplete()