Search code examples
javaspringmonospring-webfluxflux

Spring Reactive. How wait for all monos to finish?


I have the following code where I call external APIs via webclient and return Mono. I need to execute some logic when I receive data. And after all, requests are processed, execute one logic for all gathered data. I can collect all Monos and put them to flux and then execute some logic at the end. But I have serviceName filed which is accessible only in the loop, so I need to execute logic for mono in loop and here I'm stuck and don't know how to wait for all data to complete and do it in a reactive way.

@Scheduled(fixedDelay = 50000)
public void refreshSwaggerConfigurations() {
  log.debug("Starting Service Definition Context refresh");
  List<SwaggerServiceData> allServicesApi = new ArrayList<>();

  swaggerProperties.getUrls().forEach((serviceName, serviceSwaggerUrl) -> {
    log.debug("Attempting service definition refresh for Service : {} ", serviceName);

    Mono<SwaggerServiceData> swaggerData = getSwaggerDefinitionForAPI(serviceName,
        serviceSwaggerUrl);

    swaggerData.subscribe(swaggerServiceData -> {
      if (swaggerServiceData != null) {
        allServicesApi.add(swaggerServiceData);
        String content = getJSON(swaggerServiceData);
        definitionContext.addServiceDefinition(serviceName, content);
      } else {
        log.error("Skipping service id : {} Error : Could not get Swagger definition from API ",
            serviceName);
      }
    });
  });
  //I need to wait here for all monos to complete and after that proceed for All gathered data...
  //Now it's empty And I know why, just don't know how to make it.
  Optional<SwaggerServiceData> swaggerAllServicesData = getAllServicesApiSwagger(allServicesApi);
  if (swaggerAllServicesData.isPresent()) {
    String allApiContent = getJSON(swaggerAllServicesData.get());
    definitionContext.addServiceDefinition("All", allApiContent);
  }
}


private Mono<SwaggerServiceData> getSwaggerDefinitionForAPI(String serviceName, String url) {
  log.debug("Accessing the SwaggerDefinition JSON for Service : {} : URL : {} ", serviceName,
      url);
    Mono<SwaggerServiceData> swaggerServiceDataMono = webClient.get()
        .uri(url)
        .exchangeToMono(clientResponse -> clientResponse.bodyToMono(SwaggerServiceData.class));
    return swaggerServiceDataMono;
}


Solution

  • I would add a temporary class to group data and serivce name :

    record SwaggerService(SwaggerServiceData swaggerServiceData, String serviceName) {
            boolean hasData() {
                return swaggerServiceData != null;
            }
     }
    

    And then change your pipeline :

    Flux.fromStream(swaggerProperties.getUrls().entrySet().stream())
                    .flatMap((e) -> {
                        Mono<SwaggerServiceData> swaggerDefinitionForAPI = getSwaggerDefinitionForAPI(e.getKey(),
                                e.getValue());
                        return swaggerDefinitionForAPI.map(swaggerServiceData -> new SwaggerService(swaggerServiceData, e.getKey()));
                    })
                    .filter(SwaggerService::hasData)
                    .map(swaggerService -> {
                        String content = getJSON(swaggerService.swaggerServiceData());
                        definitionContext.addServiceDefinition(swaggerService.serviceName(), content);
                        return swaggerService.swaggerServiceData();
                    })
                    // here we will collect all datas and they will be emmited as single Mono with list of SwaggerServiceData
                    .collectList()
                    .map(this::getAllServicesApiSwagger)
                    .filter(Optional::isPresent)
                    .map(Optional::get)
                    .subscribe(e -> {
                        String allApiContent = getJSON(e);
                        definitionContext.addServiceDefinition("All", allApiContent);
                    });
    

    This does not deal with logging error when SwaggerServiceData is null but you can further change it if you want. Also I assume that DefinitionContext is thread safe.

    Solution with error logging (using flatMap and Mono.empty()) :

            Flux.fromStream(swaggerProperties.getUrls().entrySet().stream())
                    .flatMap((e) -> {
                        Mono<SwaggerServiceData> swaggerDefinitionForAPI = getSwaggerDefinitionForAPI(e.getKey(),
                                e.getValue());
                        return swaggerDefinitionForAPI
                                .flatMap(swaggerServiceData -> {
                                    if(swaggerServiceData != null) {
                                        return Mono.just(new SwaggerService(swaggerServiceData, e.getKey()));
                                    } else {
                                        log.error("Skipping service id : {} Error : Could not get Swagger definition from API ",
                                                e.getKey());
                                        return Mono.empty();
                                    }
                                });
                    })
                    .map(swaggerService -> {
                        String content = getJSON(swaggerService.swaggerServiceData());
                        definitionContext.addServiceDefinition(swaggerService.serviceName(), content);
                        return swaggerService.swaggerServiceData();
                    }).collectList()
                    .map(this::getAllServicesApiSwagger)
                    .filter(Optional::isPresent)
                    .map(Optional::get)
                    .subscribe(e -> {
                        String allApiContent = getJSON(e);
                        definitionContext.addServiceDefinition("All", allApiContent);
                    });
    

    You can also wrap those lambads into some meaningful methods to improve readibility.