I'm a bit new to reactive programming, and I'm trying to assemble the following: using Java, Springboot 2, Webflux, and reactor core, I want to handle very specific requests that need extra authentication. So I'm implementing a WebFilter
with a series of steps:
accessPointService.getAccessPointAuthorizationRequirement
method (returns a Mono with a Boolean).The filter
@Override
public Mono<Void> filter(final ServerWebExchange serverWebExchange, final WebFilterChain webFilterChain) {
//client for specific requests.
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
//get request for the CSRF cookie.
WebClient.RequestHeadersSpec<?> getRequest = webClient.get()
.uri("/login");
//post request for the spring security session cookie.
WebClient.RequestHeadersSpec<?> postRequest = webClient.post()
.uri("/login")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
.body(BodyInserters.fromFormData("username", "username")
.with("password", "password"));
//services that checks if the given request needs extra authentication
return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), serverWebExchange.getRequest().getPath().toString())
.log()
//gets the csrf token from the GET request
.flatMap(isRequired -> getRequest.exchangeToMono(response -> Mono.just(response.cookies().getFirst("XSRF-TOKEN").getValue())))
//combines the previous token with the POST request SESSION cookie,
//THEN secures the last request with both credentials
.zipWith(postRequest.exchangeToMono(resp -> Mono.just(resp.cookies().getFirst("SESSION").getValue())),
AuthenticationFilter::secureAuthRequest)
//gets the exchange from the request and converts the body into a String
.flatMap(AuthenticationFilter::getRequestExchange)
//code to validate if it's doing something. Not implemented yet because it never executes.
.flatMap(s -> Mono.just(s.equals("")))
.onErrorResume(e -> {
throw (CustomException) e;//breaks the execution
})
.then(webFilterChain.filter(serverWebExchange));//continues the execution
}
The secureAuthRequest
and getRequestExchange
methods invoked
//adds the springsession cookie and csrf cookie to the request
private static WebClient.RequestHeadersSpec<?> secureAuthRequest(String csrf, String spring) {
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
WebClient.RequestHeadersSpec<?> request = webClient.post()
.uri("/authcheck")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
request.header("X-XSRF-TOKEN", csrf);
request.cookies( cookies -> cookies.add( "XSRF-TOKEN", csrf) );
request.header("Authorization", spring);
return request;
}
//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {
return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
}
However, when a request is bound to be authenticated, the log is the following:
2021-10-26 23:57:18.760 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-10-26 23:57:18.761 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | request(unbounded)
2021-10-26 23:57:18.761 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onNext(true)
2021-10-26 23:57:18.762 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onComplete()
As far as I know, the stream of data starts with a subscription and a posterior request (which I think returns a TRUE from the accessPointService.getAccessPointAuthorizationRequirement
method Mono value, if I'm wrong please correct me), but then the 'onComplete()' log shows up. I don't know exactly what the onComplete() log means, since it's being shown before the execution of the getRequestExchange method (which is invoked). The Mono.just(s.equals(""))
piece of code never executes.
I've read a lot about how 'nothing happens until you subscribe', but I still don't know why the reactive flow is being invoked at all if I never explicitly subscribe to the stream, and neither I know how to implement it, since it only returns a Disposable (I guess I can throw exceptions from within?). Also, I hear about decoupling when multiple subscribers are being invoked, so I tried to avoid them as possible.
Any help regarding reactive programming, reactor-core, or the specific flow and how to improve it it's appreciated.
Cheers.
So after some research and thanks to @Toerktumlare 's comments, and figured what was happening and what I changed/applied to this.
So for the 'onComplete()
' log, it marks the end of a producer of data. So to see the full stack of the operation, I needed to chain each producer with its own log. For example:
Mono.just(Boolean.FALSE)
.log()
.flatMap(booleanVal -> Mono.just(booleanVal.toString()))
.log()
.subscribe(stringVal -> System.out.println("This is the boolean value " + stringVal));
That will produce the trace for the initial producer and the flatMap operation.
Now, onto the main problem, the issue was within the getRequestExchange
method:
//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {
return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
}
The problem was hidden in the bodyToMono
method. According to this site https://medium.com/@jeevjyotsinghchhabda/dont-let-webclient-s-bodytomono-trick-you-645123b3e0a9 , if the response to this request has no body for whatever reason, will not throw any error, but just return a Mono.empty()
. Since that the flow was not prepared for such a producer, it ended right there.
In my case, the problem was spring cloud security. I provided the Authorization credential, but not the associated SESSION cookie in the request. So the request returned a 302 (Found) without body. That was the problem (not the reactive flow itself).
So, after that, I modified the request, and @Toerktumlare 's comments helped me develop a working solution:
//service that returns if certain resource needs authentication or not, or if it's not even configured
return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), FWKUtils.translateAccessPointPath(serverWebExchange.getRequest().getPath().pathWithinApplication().elements()))
//if the response is a Mono Empty, then returns a not acceptable exception
.switchIfEmpty(Mono.defer(() -> throwNotAcceptable(serverWebExchange)))
//takes the boolean value to check if extra auth is needed.
.flatMap(isRequired -> validateAuthenticationRequirement(isRequired))
//gets the access token - the extra auth credential
.flatMap(isRequired -> getHeaderToken(serverWebExchange))
//from this access generates a WebClient to the specific authentication service - from a webClientProvider to not create too many WebClients.
.flatMap(accessToken -> generateAuthenticationRequest(webClientProvider.getInstance(), accessToken))
//gets the CRSF token credential and secures the request (adds it to the header and the cookies)
.zipWith(getCredential(webClientProvider.getInstance(), "csrf"), (securedRequest, csrfToken) -> secureAuthenticationRequest(securedRequest, csrfToken, "X-XSRF-TOKEN", "XSRF-TOKEN"))
//gets the SESSION (spring cloud security) token credential and secures the request (adds it to the header and the cookies)
.zipWith(getCredential(webClientProvider.getInstance(), "spring-cloud"), (securedRequest, sessionToken) -> secureAuthenticationRequest(securedRequest, sessionToken, "Authorization", "SESSION"))
//does the request and gets the response
.map(requestBodySpecs -> requestBodySpecs.retrieve())
//from the response, maps it to a specific DTO. The single() clause is to validate that a body is present.
.flatMap(clientResponse -> clientResponse.bodyToMono(SecurityCredentialResponseDTO.class).single())
//checks the authentication and throws a Unauthorizedstatus if its not valid.
.flatMap(responseDTO -> checkTokenAuthentication(serverWebExchange, responseDTO))
//if an error is present, then throws it
.onErrorResume(e -> {
if (e instanceof FWKException.GenericException) {
throw (FWKException.GenericException) e;
}
throw (RuntimeException) e;
})
//finally, continues the execution if no exception was thrown.
.then(webFilterChain.filter(serverWebExchange));
There's a bit more that I implemented in this solution (storing the CSRF and spring-cloud credential to avoid innecesary calls).