Search code examples
spring-bootelasticsearchspring-webfluxreactive-programmingspring-data-elasticsearch

How can the ReactiveElasticsearchClient be enriched with headers from a Mono?


I am going to enrich the Spring Data Elasticsearch - ReactiveElasticsearchClient requests with a JWT (as authorization header) retrieved from the ReactiveSecurityContextHolder.

The deprecated version of the ReactiveElasticsearchClient which is placed in the org.springframework.data.elasticsearch.client.erhlc package is based on Spring's Webclient. So it is possible like below.

enter code here
@Bean
public ReactiveElasticsearchClient deprecatedReactiveElasticsearchClient() {
    final var clientConfiguration = ClientConfiguration.builder()
        .connectedTo(elasticProperties.getUris().toArray(String[]::new))
        .withClientConfigurer(
            ReactiveRestClients.WebClientConfigurationCallback.from(
                webClient -> webClient.mutate().filter(jwtExchangeFilter()).build())
         )
         .build();
    return ReactiveRestClients.create(clientConfiguration);
}


private ExchangeFilterFunction jwtExchangeFilter() {
    return ExchangeFilterFunction.ofRequestProcessor(request -> retrieveJwt()
        .map(jwt -> ClientRequest.from(request).header(HttpHeaders.AUTHORIZATION, "Bearer " + jwt).build())
        .switchIfEmpty(Mono.defer(() -> Mono.just(ClientRequest.from(request).build()))));
}

private Mono<String> retrieveJwt() {
    return ReactiveSecurityContextHolder.getContext()
        .map(SecurityContext::getAuthentication)
        .map(Authentication::getPrincipal)
        .filter(principle -> principle instanceof Jwt)
        .map(Jwt.class::cast)
        .map(Jwt::getTokenValue);
}
enter code here

However the non-deprecated version of the ReactiveElasticsearchClient which is placed in the org.springframework.data.elasticsearch.client.elc package is based on Apache's HttpAsyncClient and I have no clue how to provide a auth header with an value provided by a Mono. There is a ElasticsearchClients.ElasticsearchHttpClientConfigurationCallback.from method available that provides a HttpAsyncClientBuilder. The builder has e.g functionality to add org.apache.http.HttpRequestInterceptor but it seems to me that these interceptors are not usable in a reactive environment when values provided via Mono.

Finally the ClientConfiguration (that can be used in deprecated and non-deprecated client) provides a withHeader method that accepts a Supplier. However I guess there is no way to provide a value from a Mono in a non-blocking way with a Supplier. Or is there a way?

I tried to configure the non-deprecated ReactiveElasticsearchClient with a client configuration callback and a header supplier but was not able to provide a header value provided by a Mono (here ReactiveSecurityContextHolder.getContext()) in a non-blocking manner. I would expect that this is possible in a reactive client. Isn't it?

https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/#elasticsearch.clients.configuration


Solution

  • Since the switch to the new Elasticsearch client, the underlying http client is the one that is provided by Elasticsearch Transport and this is a non-reactive one as you correctly mention. The ReactiveElasticsearchClient uses the async methods of the transport and puts the into a Mono.fromFuture() call (or Flux).

    So currently it is not possible to add a header from a reactive code flow.

    In the latest version of the Elasticsearch client (8.9) there has been some rewrite of the transport in order to enable users of the client library to provide their own transport (https://github.com/elastic/elasticsearch-java/issues/550 closesd by PR https://github.com/elastic/elasticsearch-java/pull/584), but I had no time yet to check if this as well allows to plug in a WebClient and how this would then work with the reactive code in Spring Data Elasticsearch; there is a ticket concerning this as well https://github.com/spring-projects/spring-data-elasticsearch/issues/2604.