Search code examples
javaspring-bootapache-camelspring-webfluxreactive-streams

How to set up several different WebFlux client properties for the different Apache Camel routes?


In the route set up we have a call for WebClient.build() being set up before the route is declared:

@Override
  public void configure() {
    createSubscription(activeProfile.equalsIgnoreCase("RESTART"));
    from(String.format("reactive-streams:%s", streamName))
        .to("log:camel.proxy?level=INFO&groupInterval=500000")
        .to(String.format("kafka:%s?brokers=%s", kafkaTopic, kafkaBrokerUrls));
  }

  private void createSubscription(boolean restart) {
    WebClient.builder()
        .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE)
        .build()
        .post()
        .uri(initialRequestUri)
        .body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody))
        .retrieve()
        .bodyToMono(String.class)
        .map(initResp ->
            new JSONObject(initResp)
                .getJSONObject("RESPONSE")
                .getJSONArray("RESULT")
                .getJSONObject(0)
                .getJSONObject("INFO")
                .getString("SSEURL")
        )
        .flatMapMany(url -> {
          log.info(url);
          return WebClient.create()
              .get()
              .uri(url)
              .retrieve()
              .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
              })
              .flatMap(sse -> {
                    val data = new JSONObject(sse.data())
                        .getJSONObject("RESPONSE")
                        .getJSONArray("RESULT")
                        .getJSONObject(0)
                        .getJSONArray(apiName);
                    val list = new ArrayList<String>();
                    for (int i = 0; i < data.length(); i++) {
                      list.add(data.getJSONObject(i).toString());
                    }
                    return Flux.fromIterable(list);
                  }
              );
            }
        )
        .onBackpressureBuffer()
        .flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class))
        .doFirst(() -> log.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started")))
        .doOnError(err -> {
          log.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
          createSubscription(true);
        })
        .doOnComplete(() -> {
          log.warn(String.format("Reactive stream %s has completed, restarting", streamName));
          createSubscription(true);
        })
        .subscribe();
  }

for my understanding the WebClient set up is made for the whole Spring Boot app and not the specific route of the Apache Camel (it isn't bent to the specific route id or url somehow), that's why new routes using the new reactive steams of other urls and other needs with headers/initial messages will get this set up too, what isn't needed.

So, the question here, is it possible to make a specific WebClient set up, associated not with the whole application, but with the specific route and make it applied for the route?

Is this configuration possible with Spring DSL?


Solution

  • The way to be applied there is rather complex:

    1. Create 2 routes, the first one is executed first and only once and is triggering a specific method of specific bean, passing the set up for the WebClient.builder() with method parameters and executing the subscription for the WebFlux. And yes, that reactive streams set up is done within the Spring Boot app's Spring context, not the Apache Camel context. So it has no direct associations with route rather than being called for set up when the specific route was started. So route looks like:

       <?xml version="1.0" encoding="UTF-8"?>
      
    1. Provide the bean. I thave put it to the Spring Boot app, not the Apache Camel context like below. The drawback here is that I have to put it here no matter will the specific route work or now. So it is always in the memory.

       import org.apache.camel.CamelContext;
       import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
       import org.json.JSONArray;
       import org.json.JSONObject;
       import org.slf4j.Logger;
       import org.slf4j.LoggerFactory;
       import org.springframework.core.ParameterizedTypeReference;
       import org.springframework.http.HttpHeaders;
       import org.springframework.http.MediaType;
       import org.springframework.http.codec.ServerSentEvent;
       import org.springframework.stereotype.Component;
       import org.springframework.web.reactive.function.BodyInserters;
       import org.springframework.web.reactive.function.client.WebClient;
       import reactor.core.publisher.Flux;    
       import java.time.ZoneId;
       import java.time.ZonedDateTime;
       import java.util.ArrayList;
      
       @Component
       public class WebFluxSetUp {
           private final Logger logger = LoggerFactory.getLogger(WebFluxSetUp.class);
           private final CamelContext camelContext;
           private final CamelReactiveStreamsService camelReactiveStreamsService;
      
           WebFluxSetUp(CamelContext camelContext, CamelReactiveStreamsService camelReactiveStreamsService) {
               this.camelContext = camelContext;
               this.camelReactiveStreamsService = camelReactiveStreamsService;
           }
      
           public void executeWebfluxSetup(boolean restart, String initialRequestUri, String restartRequestBody, String initialRequestBody, String apiName, String streamName) {
               {
                   WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE).build().post().uri(initialRequestUri).body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody)).retrieve().bodyToMono(String.class).map(initResp -> new JSONObject(initResp).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONObject("INFO").getString("SSEURL")).flatMapMany(url -> {
                       logger.info(url);
                       return WebClient.create().get().uri(url).retrieve().bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
                       }).flatMap(sse -> {
                           JSONArray data = new JSONObject(sse.data()).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONArray(apiName);
                           ArrayList<String> list = new ArrayList<String>();
                           for (int i = 0; i < data.length(); i++) {
                               list.add(data.getJSONObject(i).toString());
                           }
                           return Flux.fromIterable(list);
                       });
                   }).onBackpressureBuffer().flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class)).doFirst(() -> logger.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started"))).doOnError(err -> {
                       logger.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
                       executeWebfluxSetup(true, initialRequestUri, restartRequestBody, initialRequestBody, apiName, streamName);
                   }).doOnComplete(() -> {
                       logger.warn(String.format("Reactive stream %s has completed, restarting", streamName));
                       executeWebfluxSetup(true, initialRequestUri, restartRequestBody, initialRequestBody, apiName, streamName);
                   }).subscribe();
               }
           }
       }
      
    2. Other drawbacks there is when the route is stopped, the WebFlux client still trying to spam the reactive stream url. And there is no route-associated api/event handler to stop it and make not had-coded to the specific route.