Search code examples
javaproject-reactorspring-webclient

How to wait for subscribe to finish?


I want to make an asynchronous rest call for which I'm using spring webclient and getting back a Mono. I'm also doing some database calls in parallel but it can't be done reactively due to some reason.

    Map<String, Object> models = new HashMap<>();

    Mono<User> users = this.webClient...;
    users.map(resp -> new UserState(userRequest, resp))
            .subscribe(response -> {
                models.put("userState", response);
            });
    Iterable<Product> messages = this.productRepository.findAll();
    models.put("products", messages);
    //Wait for users.subscribe to finish <<<<<<<<<<<<<HERE
    return new ModelAndView("messages/list", models);

How do I wait for subscribe to finish before returning ModelAndView. This would have been easy if I was using a Future where I can do get() whenever I want.


Solution

  • You can wrap the blocking call in a Mono executed on a separate scheduler, zip it with the Mono containing UserState data and transform their combination into a Mono<ModelAndView> (which can be returned from Spring controller methods). The calls will be executed in parallel, results will be combined when both calls are completed.

    You can define a single bounded scheduler per application specifically for blocking calls and provide it as a constructor argument to any class that makes blocking calls.

    The code will look as follows:

    @Configuration 
    class SchedulersConfig {
    
      @Bean
      Scheduler parallelScheduler(@Value("${blocking-thread-pool-size}") int threadsCount) {
        return Schedulers.parallel(threadsCount);
      }
    }
    
    @RestController
    class Controller {
    
      final Scheduler parallelScheduler;
    
      ...
    
      Mono<User> userResponse = // webClient...
    
      Mono<Iterable<Product>> productsResponse = Mono.fromSupplier(productRepository::findAll)
        .subscribeOn(parallelScheduler); 
    
      return Mono.zip(userResponse, productsResponse, (user, products) -> 
        new ModelAndView("messages/list", 
          ImmutableMap.of(
            "userState", new UserState(userRequest, user),
            "products", products
          ))
      );
    }
    

    Update based on the comment:
    If you just need to execute HTTP call asynchronously and then join it with the database results you can do the following

    Map<String, Object> models = new HashMap<>();
    Mono<User> userMono = webClient...;
    CompletableFuture<User> userFuture = userMono.toFuture();
    Iterable<Product> messages = productRepository.findAll();
    User user = userFuture.join();
    models.put("products", messages);
    models.put("userState", new UserState(userRequest, user));
    return new ModelAndView("messages/list", models);