Search code examples
javaspring-webfluxproject-reactorreactive-streamsspring-webclient

Prevent Spring WebFlux WebClient from performing new exchange upon new subscription


The DefaultWebClient has the exchange implemented as:

@Override
public Mono<ClientResponse> exchange() {
    ClientRequest request = (this.inserter != null ?
            initRequestBuilder().body(this.inserter).build() :
            initRequestBuilder().build());
    return Mono.defer(() -> exchangeFunction.exchange(request)
            .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
            .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
}

As you can see above, the exchangeFunction.exchange call is wrapped by Mono.defer, hence it will be executed whenever something is subscribing to the returned Mono<ClientResponse>.

However, in my very specific use case, I do not want to re-execute the exchange given the below simplified code:

final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
    .flatMap(num -> {
        if (...some condition...) {
            return responseRef.updateAndGet(response -> response == null 
                                                        ? webClient.get().uri("/some-path").exchange()
                                                        : response)
                              .flatMap(response -> {...do something with num and response...});
        } else {
            return Mono.just(...something...);
        }
    })
    ...

As you can see above in my use case, I tried to use AtomicReference to get the Mono<ClientResponse> lazily so that the HTTP request will not be made again and again.

This does not work as expected since the do-something-with-num-and-response flatMap subscribing to the Mono<ClientResponse> published by exchange() will trigger its internal exchangeFunction.exchange again and again.

Can I wrap the published Mono<ClientResponse> with something to offset the effect of Mono.defer? Or is there anyway to work-around it without changing my use case's code structure?

========== Workable Solution ==========

Inspired by the accepted answer, I changed my code as follow to work:

final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
    .flatMap(num -> {
        if (...some condition...) {
            return responseRef.updateAndGet(response -> response == null 
                                                        ? webClient.get().uri("/some-path").exchange().cache()
                                                        : response)
                              .flatMap(response -> {...do something with num and response...});
        } else {
            return Mono.just(...something...);
        }
    })
    ...

Pay attention to the cache() after the exchange(). Cache of the Mono turns it into a hot source and cache last emitted signals for further subscribers. Completion and Error will also be replayed.


Solution

  • You can do something like this:

    final WebClient webClient = WebClient.create("http://localhost:8080");
    Flux<String> data = webClient
                    .get()
                    .uri("test")
                    .exchange()
                    //do whatever you need on response
                    .flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
                    .flux()
                    //Turn this Flux into a hot source and cache last emitted signals for further Subscriber
                    .replay()
                    //Connects this ConnectableFlux to the upstream source when the first Subscriber subscribes.
                    .autoConnect();
    
    Flux.range(0, 10).flatMap(integer -> data).log().subscribe();
    

    You can do:

     Mono<String> data = webClient
                    .get()
                    .uri("test")
                    .exchange()
                    .flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
                    .cache();
    
    
     Flux.range(0, 10).flatMap(integer -> {
            if (integer % 2 == 0)
                return data;
            else
                return Mono.empty();
        }).log().subscribe();