Search code examples
springspring-webfluxnettyreactor

What is the best way to invalidate a cached value of Mono?


I'll try to explain my use case as easy as possible. Before I call an external web service in my application I'm requesting an oauth access token from a token provider using the Spring Webflux WebClient and I'm caching the value so it's reused for the next time I need it. Whenever a request fails with error code 401 I want to invalidate the cache and retry with a new token.

Everything works fine accept that I'm not sure about the cache invalidation of the Mono.

Is it a good idea to store in a boolean variable if the cache needs to be invalidated? On retry when subscribing to the auth Mono the invalidating cache predicate will be true which will invalidate the cache and a new request will be done.

Any other ideas?

Sample code:

@Component
public class AccessTokenService {

    private final WebClient accessTokenClient;
    private final Mono<Token> accessTokenMono;
    private final AtomicBoolean cacheInvalidated = new AtomicBoolean();

    public AccessTokenService() {
        accessTokenClient = WebClient.builder()
                .baseUrl("example-token-uri")
                .build();

        accessTokenMono = initAccessTokenMono();
    }

    private Mono<Token> initAccessTokenMono() {
        return accessTokenClient.post()
                .uri(uriBuilder -> uriBuilder
                        // Some credentials as parameters
                        .build())
                .retrieve()
                .bodyToMono(Token.class)
                // Always set cache invalidated to false when we get the response
                .doOnSuccess(token -> cacheInvalidated.set(false))
                .cacheInvalidateIf(accessToken -> cacheInvalidated.get());
    }

    public Mono<Token> getAccessTokenMono() {
        // Shared for all requests
        return accessTokenMono;
    }

// Called on 401
    public void setCacheInvalidated() {
        // Sets the predicate to true. For the next subscription the cache will be invalidated and a new request to the token provider will occur
        cacheInvalidated.set(true);
    }
}


Solution

  • Instead of attempting to keep the state of a dirty flag in sync with the validity of the Mono, why not simply swap in a new Mono when you detect a 401? I'm also having it refresh the token periodically via cache(), but that's not explicitly required for what you're trying to do.

    @Component
    public class AccessTokenService {
        private static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofHours(1);
        private final WebClient accessTokenClient;
        private final AtomicReference<Mono<Token>> accessTokenMono = new AtomicReference<>();
    
        public AccessTokenService() {
            accessTokenClient = WebClient.builder()
                .baseUrl("example-token-uri")
                .build();
    
            accessTokenMono.set(initAccessTokenMono());
        }
    
        private Mono<Token> initAccessTokenMono() {
            return accessTokenClient.post()
                .uri(uriBuilder -> uriBuilder
                    // Some credentials as parameters
                    .build())
                .retrieve()
                .bodyToMono(Token.class)
                .cache(token -> TOKEN_REFRESH_INTERVAL,         // refresh every interval
                    exc -> Duration.ZERO, () -> Duration.ZERO); // don't cache exceptions or empties
        }
    
        public Mono<Token> getAccessTokenMono() {
            // Shared for all requests
            return accessTokenMono.get();
        }
    
        // Called on 401
        public void setCacheInvalidated() {
            // Plug in a new Mono so subsequent requests get a new token
            accessTokenMono.set(initAccessTokenMono());
        }
    }