Search code examples
spring-webfluxproject-reactorreactor

Flux collectList() on list of WebClient exchanges always empty


I'm trying to execute a list requests using WebClient, then filter them finding the first one that succeed (if any) and return that. Or fall back to a default response if non succeeded.

The problem I'm facing is that when I call .collectList() on a Flux<ServerResponse>, the list is always empty. I would have expected the list to contain N number of ServerResponse based on the number of requests I issued earlier.

public Mono<ServerResponse> retry(ServerRequest request) {
    return Flux.fromIterable(request.headers().header(SEQUENCE_HEADER_NAME))
            .map(URI::create)
            // Build a "list" of responses
            .flatMap(uri -> webClientBuilder.baseUrl(uri.toString()).build()
                    .method(Objects.requireNonNull(request.method()))
                    .headers(headers -> request.headers().asHttpHeaders().forEach((key, values) -> {
                        if (!SEQUENCE_HEADER_NAME.equals(key)) {
                            headers.addAll(key, values);
                        }
                    }))
                    .body(BodyInserters.fromDataBuffers(request.body(BodyExtractors.toDataBuffers())))
                    .exchange()
                    .flatMap(clientResponse -> ServerResponse.status(clientResponse.statusCode())
                            .headers(headers -> headers.addAll(clientResponse.headers().asHttpHeaders()))
                            .body(BodyInserters.fromDataBuffers(clientResponse.body(BodyExtractors.toDataBuffers()))))
            )
            // "Wait" for all of them to complete so we can filter
            .collectList()
            .flatMap(clientResponses -> {
                List<ServerResponse> filteredResponses = clientResponses.stream()
                        .filter(response -> response.statusCode().is2xxSuccessful())
                        .collect(Collectors.toList());

                if (filteredResponses.isEmpty()) {
                    log.error("No request succeeded; defaulting to {}", HttpStatus.BAD_REQUEST.toString());
                    return ServerResponse.badRequest().build();
                }

                if (filteredResponses.size() > 1) {
                    log.error("Multiple requests succeeded; defaulting to {}", HttpStatus.BAD_REQUEST.toString());
                    return ServerResponse.badRequest().build();
                }

                return Mono.just(filteredResponses.get(0));
            });
}

Any ideas why .collectList() always returns an empty list?


Solution

  • Well, it seems to me you have a confused requirement in that you want the First Mono that responds but you are trying to put that functionality into a Flux which is meant to process all items in the flow efficiently. Mono in Webflux is meant to create a flow that will perform a series of transformations on the item in the flow efficiently. Nothing in your requirement of testing a bunch of URIs for the first one that succeeds is what WebFlux is good for so I have to question why try to force that into the framework.

    You might argue that a Flux is giving you better asynchronous processing but I don't think that's the case when it is a bunch of WebClient calls. WebClient is still HTTP under the hood and so each item in the flow stops and starts around WebClient. If you want to do HTTP asynchronously you should use a ThreadPool and Callable.