I'll try to explain my use case as easy as possible. Before I call an external web service in my application I'm requesting an oauth access token from a token provider using the Spring Webflux WebClient and I'm caching the value so it's reused for the next time I need it. Whenever a request fails with error code 401 I want to invalidate the cache and retry with a new token.
Everything works fine accept that I'm not sure about the cache invalidation of the Mono.
Is it a good idea to store in a boolean variable if the cache needs to be invalidated? On retry when subscribing to the auth Mono the invalidating cache predicate will be true which will invalidate the cache and a new request will be done.
Any other ideas?
Sample code:
@Component
public class AccessTokenService {
private final WebClient accessTokenClient;
private final Mono<Token> accessTokenMono;
private final AtomicBoolean cacheInvalidated = new AtomicBoolean();
public AccessTokenService() {
accessTokenClient = WebClient.builder()
.baseUrl("example-token-uri")
.build();
accessTokenMono = initAccessTokenMono();
}
private Mono<Token> initAccessTokenMono() {
return accessTokenClient.post()
.uri(uriBuilder -> uriBuilder
// Some credentials as parameters
.build())
.retrieve()
.bodyToMono(Token.class)
// Always set cache invalidated to false when we get the response
.doOnSuccess(token -> cacheInvalidated.set(false))
.cacheInvalidateIf(accessToken -> cacheInvalidated.get());
}
public Mono<Token> getAccessTokenMono() {
// Shared for all requests
return accessTokenMono;
}
// Called on 401
public void setCacheInvalidated() {
// Sets the predicate to true. For the next subscription the cache will be invalidated and a new request to the token provider will occur
cacheInvalidated.set(true);
}
}
Instead of attempting to keep the state of a dirty flag in sync with the validity of the Mono
, why not simply swap in a new Mono
when you detect a 401? I'm also having it refresh the token periodically via cache()
, but that's not explicitly required for what you're trying to do.
@Component
public class AccessTokenService {
private static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofHours(1);
private final WebClient accessTokenClient;
private final AtomicReference<Mono<Token>> accessTokenMono = new AtomicReference<>();
public AccessTokenService() {
accessTokenClient = WebClient.builder()
.baseUrl("example-token-uri")
.build();
accessTokenMono.set(initAccessTokenMono());
}
private Mono<Token> initAccessTokenMono() {
return accessTokenClient.post()
.uri(uriBuilder -> uriBuilder
// Some credentials as parameters
.build())
.retrieve()
.bodyToMono(Token.class)
.cache(token -> TOKEN_REFRESH_INTERVAL, // refresh every interval
exc -> Duration.ZERO, () -> Duration.ZERO); // don't cache exceptions or empties
}
public Mono<Token> getAccessTokenMono() {
// Shared for all requests
return accessTokenMono.get();
}
// Called on 401
public void setCacheInvalidated() {
// Plug in a new Mono so subsequent requests get a new token
accessTokenMono.set(initAccessTokenMono());
}
}