Search code examples
javareactive-programmingspring-webfluxproject-reactor

Reactive Programming - Webflux Webfilter not behaving properly


I'm a bit new to reactive programming, and I'm trying to assemble the following: using Java, Springboot 2, Webflux, and reactor core, I want to handle very specific requests that need extra authentication. So I'm implementing a WebFilter with a series of steps:

  • Capture the path and the method of the request. Check if the combination exists and needs specific authentication with the accessPointService.getAccessPointAuthorizationRequirement method (returns a Mono with a Boolean).
  • Since I have CSRF and Spring security configured, I need both csrf token and springsession credentials. I make a GET and a POST request for the credentials.
  • Then with the credentials, I simply make a POST request to a service (authcheck) that can do a series of security checks (the service is OK, works fine from Postman and Angular).
  • After that, I need to retrieve the body, convert it to String, and inspect it. Right now this does not happen.

The filter

@Override
    public Mono<Void> filter(final ServerWebExchange serverWebExchange, final WebFilterChain webFilterChain) {

        //client for specific requests.
        WebClient webClient = WebClient.builder()
                .baseUrl("http://localhost:8080")
                .build();
        //get request for the CSRF cookie.
        WebClient.RequestHeadersSpec<?> getRequest = webClient.get()
                .uri("/login");
        //post request for the spring security session cookie.
        WebClient.RequestHeadersSpec<?> postRequest = webClient.post()
                .uri("/login")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
                .body(BodyInserters.fromFormData("username", "username")
                        .with("password", "password"));
                        //services that checks if the given request needs extra authentication
        return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), serverWebExchange.getRequest().getPath().toString())
                .log()
                //gets the csrf token from the GET request
                .flatMap(isRequired -> getRequest.exchangeToMono(response -> Mono.just(response.cookies().getFirst("XSRF-TOKEN").getValue())))
                //combines the previous token with the POST request SESSION cookie,
                //THEN secures the last request with both credentials
                .zipWith(postRequest.exchangeToMono(resp -> Mono.just(resp.cookies().getFirst("SESSION").getValue())),
                        AuthenticationFilter::secureAuthRequest)
                //gets the exchange from the request and converts the body into a String
                .flatMap(AuthenticationFilter::getRequestExchange)
                //code to validate if it's doing something. Not implemented yet because it never executes.
                .flatMap(s -> Mono.just(s.equals("")))
                .onErrorResume(e -> {
                    throw (CustomException) e;//breaks the execution
                })
                .then(webFilterChain.filter(serverWebExchange));//continues the execution
    }

The secureAuthRequest and getRequestExchange methods invoked

//adds the springsession cookie and csrf cookie to the request
private static WebClient.RequestHeadersSpec<?> secureAuthRequest(String csrf, String spring) {

        WebClient webClient = WebClient.builder()
                .baseUrl("http://localhost:8080")
                .build();
        WebClient.RequestHeadersSpec<?> request = webClient.post()
                .uri("/authcheck")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
        request.header("X-XSRF-TOKEN", csrf);
        request.cookies( cookies -> cookies.add(  "XSRF-TOKEN", csrf) );
        request.header("Authorization", spring);
        return request;
    }

//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {

        return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
    }

However, when a request is bound to be authenticated, the log is the following:

2021-10-26 23:57:18.760  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-10-26 23:57:18.761  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | request(unbounded)
2021-10-26 23:57:18.761  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onNext(true)
2021-10-26 23:57:18.762  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onComplete()

As far as I know, the stream of data starts with a subscription and a posterior request (which I think returns a TRUE from the accessPointService.getAccessPointAuthorizationRequirement method Mono value, if I'm wrong please correct me), but then the 'onComplete()' log shows up. I don't know exactly what the onComplete() log means, since it's being shown before the execution of the getRequestExchange method (which is invoked). The Mono.just(s.equals("")) piece of code never executes.

I've read a lot about how 'nothing happens until you subscribe', but I still don't know why the reactive flow is being invoked at all if I never explicitly subscribe to the stream, and neither I know how to implement it, since it only returns a Disposable (I guess I can throw exceptions from within?). Also, I hear about decoupling when multiple subscribers are being invoked, so I tried to avoid them as possible.

Any help regarding reactive programming, reactor-core, or the specific flow and how to improve it it's appreciated.

Cheers.


Solution

  • So after some research and thanks to @Toerktumlare 's comments, and figured what was happening and what I changed/applied to this.

    So for the 'onComplete()' log, it marks the end of a producer of data. So to see the full stack of the operation, I needed to chain each producer with its own log. For example:

    Mono.just(Boolean.FALSE)
        .log()
        .flatMap(booleanVal -> Mono.just(booleanVal.toString()))
        .log()
        .subscribe(stringVal -> System.out.println("This is the boolean value " + stringVal));
    

    That will produce the trace for the initial producer and the flatMap operation.

    Now, onto the main problem, the issue was within the getRequestExchange method:

    //gets the body as string.
    private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {
    
            return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
        }
    

    The problem was hidden in the bodyToMono method. According to this site https://medium.com/@jeevjyotsinghchhabda/dont-let-webclient-s-bodytomono-trick-you-645123b3e0a9 , if the response to this request has no body for whatever reason, will not throw any error, but just return a Mono.empty(). Since that the flow was not prepared for such a producer, it ended right there.

    In my case, the problem was spring cloud security. I provided the Authorization credential, but not the associated SESSION cookie in the request. So the request returned a 302 (Found) without body. That was the problem (not the reactive flow itself).

    So, after that, I modified the request, and @Toerktumlare 's comments helped me develop a working solution:

    //service that returns if certain resource needs authentication or not, or if it's not even configured
    return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), FWKUtils.translateAccessPointPath(serverWebExchange.getRequest().getPath().pathWithinApplication().elements()))
                    //if the response is a Mono Empty, then returns a not acceptable exception
                    .switchIfEmpty(Mono.defer(() -> throwNotAcceptable(serverWebExchange)))
                    //takes the boolean value to check if extra auth is needed.
                    .flatMap(isRequired -> validateAuthenticationRequirement(isRequired))
                    //gets the access token - the extra auth credential
                    .flatMap(isRequired -> getHeaderToken(serverWebExchange))
                    //from this access generates a WebClient to the specific authentication service - from a webClientProvider to not create too many WebClients.
                    .flatMap(accessToken -> generateAuthenticationRequest(webClientProvider.getInstance(), accessToken))
                    //gets the CRSF token credential and secures the request (adds it to the header and the cookies)
                    .zipWith(getCredential(webClientProvider.getInstance(), "csrf"), (securedRequest, csrfToken) -> secureAuthenticationRequest(securedRequest, csrfToken, "X-XSRF-TOKEN", "XSRF-TOKEN"))
                    //gets the SESSION (spring cloud security) token credential and secures the request (adds it to the header and the cookies)
                    .zipWith(getCredential(webClientProvider.getInstance(), "spring-cloud"), (securedRequest, sessionToken) -> secureAuthenticationRequest(securedRequest, sessionToken, "Authorization", "SESSION"))
                    //does the request and gets the response
                    .map(requestBodySpecs -> requestBodySpecs.retrieve())
                    //from the response, maps it to a specific DTO. The single() clause is to validate that a body is present.
                    .flatMap(clientResponse -> clientResponse.bodyToMono(SecurityCredentialResponseDTO.class).single())
                    //checks the authentication and throws a Unauthorizedstatus if its not valid.
                    .flatMap(responseDTO -> checkTokenAuthentication(serverWebExchange, responseDTO))
                    //if an error is present, then throws it 
                    .onErrorResume(e -> {
                        if (e instanceof FWKException.GenericException) {
                            throw (FWKException.GenericException) e;
                        }
                        throw (RuntimeException) e;
                    })
                    //finally, continues the execution if no exception was thrown.
                    .then(webFilterChain.filter(serverWebExchange));
    

    There's a bit more that I implemented in this solution (storing the CSRF and spring-cloud credential to avoid innecesary calls).