The DefaultWebClient
has the exchange
implemented as:
@Override
public Mono<ClientResponse> exchange() {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
}
As you can see above, the exchangeFunction.exchange
call is wrapped by Mono.defer
, hence it will be executed whenever something is subscribing to the returned Mono<ClientResponse>
.
However, in my very specific use case, I do not want to re-execute the exchange given the below simplified code:
final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
.flatMap(num -> {
if (...some condition...) {
return responseRef.updateAndGet(response -> response == null
? webClient.get().uri("/some-path").exchange()
: response)
.flatMap(response -> {...do something with num and response...});
} else {
return Mono.just(...something...);
}
})
...
As you can see above in my use case, I tried to use AtomicReference
to get the Mono<ClientResponse>
lazily so that the HTTP request will not be made again and again.
This does not work as expected since the do-something-with-num-and-response flatMap
subscribing to the Mono<ClientResponse>
published by exchange()
will trigger its internal exchangeFunction.exchange
again and again.
Can I wrap the published Mono<ClientResponse>
with something to offset the effect of Mono.defer
? Or is there anyway to work-around it without changing my use case's code structure?
========== Workable Solution ==========
Inspired by the accepted answer, I changed my code as follow to work:
final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
.flatMap(num -> {
if (...some condition...) {
return responseRef.updateAndGet(response -> response == null
? webClient.get().uri("/some-path").exchange().cache()
: response)
.flatMap(response -> {...do something with num and response...});
} else {
return Mono.just(...something...);
}
})
...
Pay attention to the cache()
after the exchange()
. Cache of the Mono
turns it into a hot source and cache last emitted signals for further subscribers. Completion and Error will also be replayed.
You can do something like this:
final WebClient webClient = WebClient.create("http://localhost:8080");
Flux<String> data = webClient
.get()
.uri("test")
.exchange()
//do whatever you need on response
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
.flux()
//Turn this Flux into a hot source and cache last emitted signals for further Subscriber
.replay()
//Connects this ConnectableFlux to the upstream source when the first Subscriber subscribes.
.autoConnect();
Flux.range(0, 10).flatMap(integer -> data).log().subscribe();
You can do:
Mono<String> data = webClient
.get()
.uri("test")
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
.cache();
Flux.range(0, 10).flatMap(integer -> {
if (integer % 2 == 0)
return data;
else
return Mono.empty();
}).log().subscribe();