Search code examples
cachingmonoreactive-programmingspring-webfluxproject-reactor

Caching parallel request in Spring Webflux Mono


We are using spring webflux (project reactor), as part of the requirement we need to call one API from our server.

For the API call, we need to cache the response. So we are using Mono.cache operator.

It caches the response Mono<ResponseDto> and the next time the same API call happens, it will get it from the cache. Following is example implementation

public Mono<ResponseDto> getResponse() {
    if (res == null) {
      res =
          fetchResponse()
              .onErrorMap(Exception.class, (error) -> new CustomException())
              .cache(
                  r -> Duration.ofSeconds(r.expiresIn()),
                  error -> Duration.ZERO,
                  () -> Duration.ZERO);
    }
    return res;
  }

The problem is if the server calls the same API call twice ( for example Mono.zip) at the same time, then the response is not cached and we actually call it twice.

Is there any out of box solution available to this problem? Instead of caching the Response, can we cache the Mono itself so that both requests subscribe to the same Mono hence both are executed after a Single API call response?

It should also work with sequential execution too - I am afraid that if we cache the Mono then once the request is completed, the subscription is over and no other process can subscribe to it.

Cases


Solution

  • Project Reactor provides a cache utility CacheMono that is non-blocking but can stampede.

    AsyncCache will be better integration, for the first lookup with key "K" will result in a cache miss, it will return a CompletableFuture of the API call and for the second lookup with the same key "K" will get the same CompletableFuture object.

    The returned future object can be converted to/from Mono with Mono.fromFuture()

     public Mono<ResponseData> lookupAndWrite(AsyncCache<String, ResponseData> cache, String key) {
    return Mono.defer(
        () ->
            Mono.fromFuture(
                cache.get(
                    key,
                    (searchKey, executor) -> {
                      CompletableFuture<ResponseData> future = callAPI(searchKey).toFuture();
                      return future.whenComplete(
                          (r, t) -> {
                            if (t != null) {
                              cache.synchronous().invalidate(key);
                            }
                          });
                    })));}