Search code examples
javaspring-webfluxproject-reactorspring-webclient

Spring WebFlux | How to wait till a list of Monos finish execution in parallel


I want to send n-number of requests to a REST endpoint in parallel.I want to make sure these get executed in different threads for performance and need to wait till all n requests finish.

Only way I could come up with is using CountDownLatch as follows (please check the main() method. This is testable code):

        public static void main(String args[]) throws Exception {
            
            int n = 10; //n is dynamic during runtime
    
            final CountDownLatch waitForNRequests = new CountDownLatch(n);
            //send n requests
            for (int i =0;i<n;i++) {
                var r = testRestCall(""+i);
                r.publishOn(Schedulers.parallel()).subscribe(res -> {
                    System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
                    
                    waitForNRequests.countDown();
                });
            }
    
    
            waitForNRequests.await(); //wait till all n requests finish before goto the next line
    
            System.out.println("All n requests finished");
            Thread.sleep(10000); 
        }

 

public static Mono<ResponseEntity<Map>> testRestCall(String id) {
    
            WebClient client = WebClient.create("https://reqres.in/api");
    
            JSONObject request = new JSONObject();
            request.put("name", "user"+ id);
            request.put("job", "leader");
    
            var res = client.post().uri("/users")
                    .contentType(MediaType.APPLICATION_JSON)
                    .body(BodyInserters.fromValue(request.toString()))
    
                    .accept(MediaType.APPLICATION_JSON)
                    .retrieve()
                    .toEntity(Map.class)
                    .onErrorReturn(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build());
            return res;
    
        }
    

This doesnt look good and I am sure there is an elegant solution without using Latches..etc I tried following method,but I dont know how to resolve following issues:

  1. Flux.merge() , contact() results in executing all n-requests in a single thread
  2. How to wait till n-requests finish execution (fork-join)?
List<Mono<ResponseEntity<Map>>> lst = new ArrayList<>();
int n = 10; //n is dynamic during runtime
for (int i =0;i<n;i++) {
    var r = testRestCall(""+i);
    lst.add(r);
}

var t= Flux.fromIterable(lst).flatMap(Function.identity()); //tried merge() contact() as well
t.publishOn(Schedulers.parallel()).subscribe(res -> {
            System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
            ///??? all requests execute in a single thread.How to parallelize ?
        });

//???How to wait till all n requests finish before goto the next line
System.out.println("All n requests finished");

Thread.sleep(10000); 

Update:

I found the reason why the Flux subscriber runs in the same thread, I need to create a ParallelFlux. So the correct order should be:

var t= Flux.fromIterable(lst).flatMap(Function.identity()); 
t.parallel()
 .runOn(Schedulers.parallel())
 .subscribe(res -> {
            System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
            ///??? all requests execute in a single thread.How to parallelize ?
        });

Ref: https://projectreactor.io/docs/core/release/reference/#advanced-parallelizing-parralelflux


Solution

  • In reactive you think not about threads but about concurrency.

    Reactor executes non-blocking/async tasks on a small number of threads using Schedulers abstraction to execute tasks. Schedulers have responsibilities very similar to ExecutorService. By default, for parallel scheduler number of threads is equal to number of CPU cores, but could be controlled by `reactor.schedulers.defaultPoolSize’ system property.

    In your example instead of creating multiple Mono and then merge them, better to use Flux and then process elements in parallel controlling concurrency.

    Flux.range(1, 10)
        .flatMap(this::testRestCall)
    

    By default, flatMap will process Queues.SMALL_BUFFER_SIZE = 256 number of in-flight inner sequences.

    You could control concurrency flatMap(item -> process(item), concurrency) or use concatMap operator if you want to process sequentially. Check flatMap(..., int concurrency, int prefetch) for details.

    Flux.range(1, 10)
        .flatMap(i -> testRestCall(i), 5)
    

    The following test shows that calls are executed in different threads

    @Test
    void testParallel() {
        var flow = Flux.range(1, 10)
                .flatMap(i -> testRestCall(i))
                .log()
                .then(Mono.just("complete"));
    
        StepVerifier.create(flow)
                .expectNext("complete")
                .verifyComplete();
    
    }
    

    The result log

    2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-4] reactor.Mono.FlatMap.3                   : | onComplete()
    2022-12-30 21:31:25.170  INFO 43383 --- [ctor-http-nio-3] reactor.Mono.FlatMap.2                   : | onComplete()
    2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-2] reactor.Mono.FlatMap.1                   : | onComplete()
    2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-8] reactor.Mono.FlatMap.7                   : | onComplete()
    2022-12-30 21:31:25.169  INFO 43383 --- [tor-http-nio-11] reactor.Mono.FlatMap.10                  : | onComplete()
    2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-7] reactor.Mono.FlatMap.6                   : | onComplete()
    2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-9] reactor.Mono.FlatMap.8                   : | onComplete()
    2022-12-30 21:31:25.170  INFO 43383 --- [ctor-http-nio-6] reactor.Mono.FlatMap.5                   : | onComplete()
    2022-12-30 21:31:25.378  INFO 43383 --- [ctor-http-nio-5] reactor.Mono.FlatMap.4                   : | onComplete()