Search code examples
javareactive-programmingspring-webfluxproject-reactor

How do I wait for multiple Mono's to complete at once and get the value


Similar in question to Waiting for running Reactor Mono instances to complete but I want to get the result ideally in another Mono. Here's the code I have. I tried the materialize solution but that didn't pan out.

    @GetMapping("/bounced")
    public Mono<Map<String, Object>> bounced(
        @RequestHeader("X-B3-Traceid") String traceId,
        @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
    ) {

        final Mono<Map<String, Object>> sample = webClient.get()
            .uri("http://sample:8080/")
            .header(HttpHeaders.AUTHORIZATION, authorization)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        final Mono<Map<String, Object>> httpGet = webClient.get()
            .uri("http://httpbin.org/get")
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        final Mono<Map<String, Object>> anything = webClient.get()
            .uri("http://httpbin.org/anything/foo")
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

/*
   Tried this and it does start it up, but it triggers another "download" in the return block.

        Mono.when(anything, sample, httpGet)
            .subscribe();
            .materialize()
            .block();
*/
        return Mono.just(Map.of("traceFromBounced", traceId,
            "anything", anything.block(),
            "sample", sample.block(),
            "httpGet", httpGet.block()));

Solution

  • Based on @K.Nicholas' comment I got it working

        @GetMapping("/bounced")
        public Mono<Map<String, Object>> bounced(
            @RequestHeader("X-B3-Traceid") String traceId,
            @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
        ) {
    
            final Mono<Map<String, Object>> sample = webClient.get()
                .uri("http://sample:8080/")
                .header(HttpHeaders.AUTHORIZATION, authorization)
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<>() {
                });
    
            final Mono<Map<String, Object>> httpGet = webClient.get()
                .uri("http://httpbin.org/get")
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<>() {
                });
    
            final Mono<Map<String, Object>> anything = webClient.get()
                .uri("http://httpbin.org/anything/foo")
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<>() {
                });
    
            return Mono.zip(anything, sample, httpGet)
                .map(t -> Map.of("traceFromBounced", traceId,
                    "anything", t.getT1(),
                    "sample", t.getT2(),
                    "httpGet", t.getT3()));
    
        }
    

    Here's the zipkin output that shows it runs in parallel enter image description here