Search code examples
quarkusinterceptorvert.xmutiny

Custom interceptor in quarkus mutiny web client


I am using Quarkus and Mutiny in test application.

Creating the web client like below which will be used to interact with other micro services.

https://smallrye.io/smallrye-mutiny-vertx-bindings/2.16.2/apidocs/io/vertx/mutiny/ext/web/client/WebClient.html#create(io.vertx.mutiny.core.Vertx,io.vertx.ext.web.client.WebClientOptions)

Webclient client = Webclient.create(vertex, webClientOptions);

The web client is being used to interact with other micro services like below . This is one of the example. For all the rest client interaction will be using the same Web client.

private static final String URL ="https://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks";
        
@GET
@Path("/web")
public Uni<JsonArray> retrieveDataFromWikipedia() {                     
    return client.getAbs(URL).send()                                    
                 .onItem().transform(HttpResponse::bodyAsJsonObject)         
                 .onItem().transform(json -> json.getJsonObject("parse")     
                                                .getJsonArray("langlinks"));
}

For each outbound rest request I want to add the logging correlation id the request header. So I was exploring if we can add any interceptor to the Web client. But I am unable to find any way to that. Web client does not have any interceptor method.
Is there any way to achieve this?


Solution

  • Vert.x Web has an internal API (WebClientInternal) to attach interceptors to each requests and responses.

    The following example based on Quarkus 3.3.0.Final (Vert.x 4.4.4, Smallrye Mutiny Vert.x bindings 3.5.0) using Resteasy Reactive extension:

    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-resteasy-reactive</artifactId>
    </dependency>
    

    and

    <dependency>
        <groupId>io.smallrye.reactive</groupId>
        <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
    </dependency>
    

    managed dependency.

    Phase #1 - attach interceptor

    @Path("/hello")
    public class ExampleResource {
    
        // Schema changed to http on purpose to intercept a redirect response
        private static final String URL = "http://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks";
    
        // Used only to delay outgoing requests
        private final Random random = new Random();
    
        @Inject
        Vertx vertx; // io.vertx.mutiny.core.Vertx;
    
        @GET
        @Produces(MediaType.TEXT_PLAIN)
        public Uni<JsonArray> intercepted() {
    
            var options = new WebClientOptions();
            var client = WebClient.create(vertx, options);
    
            // Unwrap internal API to add interceptors
            var delegate = (WebClientInternal) client.getDelegate();
            delegate
                    .addInterceptor(ExampleResource::phaseInterceptor)
                    .addInterceptor(this::correlationInterceptor);
    
            return client.getAbs(URL).send()
                    .onItem().transform(HttpResponse::bodyAsJsonObject)
                    .onItem().transform(json -> json.getJsonObject("parse")
                            .getJsonArray("langlinks"));
        }
    
    // ...
    }
    

    Keep in mind those interceptors called multiple times (in case of redirect or retry, etc) in serveral phases, so it must be handled inside the incerceptor.

    PhaseInterceptor will display different log messages to demonstrate different phases:

    private static void phaseInterceptor(HttpContext<?> context) {
    
        String msg = switch (context.phase()) {
            case PREPARE_REQUEST -> "Preparing request to: " + context.request().host();
            case CREATE_REQUEST -> "Creating request to: " + context.request().uri();
            case SEND_REQUEST -> "Sending " + context.request().method() + " request to " + context.request().host();
            case FOLLOW_REDIRECT -> "Redirecting to: " + context.clientResponse().getHeader("Location");
            case RECEIVE_RESPONSE -> "Got " + context.clientResponse().statusMessage() + " response";
            case DISPATCH_RESPONSE ->
                    "Reading " + context.clientResponse().getHeader("Content-Length") + " byte(s) length response";
            case FAILURE -> "Something went wrong";
        };
        Log.info(msg);
        context.next();
    }
    

    Don't forget to call context.next() to proceed next call in the interceptor chain.

    When calling the endpoint

    curl --head -v http://localhost:8080/hello/intercepted
    

    phaseInterceptor will log something like this:

    2023-08-27 22:35:17,304 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Preparing request to: en.wikipedia.org
    2023-08-27 22:35:17,307 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Creating request to: /w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
    2023-08-27 22:35:17,505 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Sending GET request to en.wikipedia.org
    2023-08-27 22:35:17,563 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Redirecting to: https://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
    2023-08-27 22:35:17,564 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Creating request to: /w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
    2023-08-27 22:35:17,894 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Sending GET request to en.wikipedia.org
    2023-08-27 22:35:18,070 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got OK response
    2023-08-27 22:35:18,079 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Reading 440 byte(s) length response
    

    Phase #2 Associate request and response

    Vert.x HttpContext is useful to maintain any data for the lifetime of the request.

    private void correlationInterceptor(HttpContext<?> context) {
        if (context.phase() == ClientPhase.PREPARE_REQUEST) {
            var correlationId = UUID.randomUUID();
            var delay = random.nextLong(500);
            Log.infof("Request of id [%s] delayed %4d milliseconds", correlationId, delay);
            vertx.setTimer(delay, l -> {
                // Set unique identifier as a context attribute
                context.set("CorrelationId", correlationId);
                // (Optional) set header value if it needed
                context.request().putHeader("X-Correlation-Id", correlationId.toString());
                context.next();
            });
        } else if (context.phase() == ClientPhase.RECEIVE_RESPONSE) {
            // Read shared correlationId from context data
            var correlationId = context.get("CorrelationId");
            // (Optional) set header value if it needed
            context.clientResponse().headers().add("X-Correlation-Id", correlationId.toString());
            Log.infof("Got response for correlation id: [%s]", correlationId);
            context.next();
        } else {
            context.next();
        }
    }
    
    2023-08-27 23:01:10,278 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [ea8d37fe-19a8-431c-91a3-553a2987f800] delayed  104 milliseconds
    2023-08-27 23:01:10,719 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [ea8d37fe-19a8-431c-91a3-553a2987f800]
    2023-08-27 23:01:10,725 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Request of id [7d920c38-ccba-4f27-a95e-594c23ac2fa2] delayed   73 milliseconds
    2023-08-27 23:01:10,725 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [0862f366-a6a4-4327-8f54-a6c98437db68] delayed   53 milliseconds
    2023-08-27 23:01:10,727 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [ea79effa-b225-441c-a9fc-d359fb4e597f] delayed  210 milliseconds
    2023-08-27 23:01:10,727 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Request of id [7a8197da-4ea3-4002-a95f-a389841596fc] delayed  204 milliseconds
    2023-08-27 23:01:11,112 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [0862f366-a6a4-4327-8f54-a6c98437db68]
    2023-08-27 23:01:11,153 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got response for correlation id: [7d920c38-ccba-4f27-a95e-594c23ac2fa2]
    2023-08-27 23:01:11,257 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got response for correlation id: [7a8197da-4ea3-4002-a95f-a389841596fc]
    2023-08-27 23:01:11,298 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [ea79effa-b225-441c-a9fc-d359fb4e597f]
    

    Finally here is a simplified version of correlationInterceptor without delay.

    private void correlationInterceptor(HttpContext<?> context) {
        if (context.phase() == ClientPhase.PREPARE_REQUEST) {
            var correlationId = UUID.randomUUID();
            context.set("CorrelationId", correlationId);
            context.request().putHeader("X-Correlation-Id", correlationId.toString());
        } else if (context.phase() == ClientPhase.RECEIVE_RESPONSE) {
            var correlationId = context.get("CorrelationId");
            context.clientResponse().headers().add("X-Correlation-Id", correlationId.toString());
            Log.infof("Got response for correlation id: [%s]", correlationId);
        }
        context.next();
    }