Search code examples
spring-webfluxjava-11spring-webclienthexagonal-architecture

How to return a Flux in async/reactive webclient request with subscribe method


I am using spring hexagonal architecture (port and adapter) as my application need to read the stream of data from the source topic, process/transforms the data, and send it to destination topic.

My application need to do the following actions.

  1. Read the data (which will have the call back url)
  2. Make an http call with the url in the incoming data (using webclient)
  3. Get the a actual data and it needs to be transformed into another format.
  4. Send the transformed data to the outgoing topic.

Here is my code,

public Flux<TargeData> getData(Flux<Message<EventInput>> message) 
{
    return message
        .flatMap(it -> {
            Event event = objectMapper.convertValue(it.getPayload(), Event.class);
            String eventType = event.getHeader().getEventType();
            String callBackURL = "";
            if (DISTRIBUTOR.equals(eventType)) {
                callBackURL = event.getHeader().getCallbackEnpoint();
                WebClient client = WebClient.create();
                Flux<NodeInput> nodeInputFlux = client.get()
                    .uri(callBackURL)
                    .headers(httpHeaders -> {
                        httpHeaders.setContentType(MediaType.APPLICATION_JSON);
                        List<MediaType> acceptTypes = new ArrayList<>();
                        acceptTypes.add(MediaType.APPLICATION_JSON);
                        httpHeaders.setAccept(acceptTypes);
                    })
                .exchangeToFlux(response -> {
                    if (response.statusCode()
                            .equals(HttpStatus.OK)) {
                        System.out.println("Response is OK");
                        return response.bodyToFlux(NodeInput.class);
                    }
                    return Flux.empty();
                });
                nodeInputFlux.subscribe( nodeInput -> {
                    SourceData source = objectMapper.convertValue(nodeInput, SourceData.class);
                //  return Flux.fromIterable(this.TransformImpl.transform(source));
                });
            }
        return Flux.empty();
    });
}

The commented line in the above code is giving the compilation as subscribe method does not allow return types.

I need a solution "without using block" here.

Please help me here, Thanks in advance.


Solution

  • I think i understood the logic. What do you may want is this:

    
    public Flux<TargeData> getData(Flux<Message<EventInput>> message) {
        return message
            .flatMap(it -> {
              // 1. marshall and unmarshall operations are CPU expensive and could harm event loop
              return Mono.fromCallable(() -> objectMapper.convertValue(it.getPayload(), Event.class))
                  .subscribeOn(Schedulers.parallel());
            })
            .filter(event -> {
              // 2. Moving the if-statement yours to a filter - same behavior
              String eventType = event.getHeader().getEventType();
              return DISTRIBUTOR.equals(eventType);
            })
            // Here is the trick 1 - your request below return Flux of SourceData the we will flatten
            // into a single Flux<SourceData> instead of Flux<List<SourceData>> with flatMapMany
            .flatMap(event -> {
              // This WebClient should not be created here. Should be a singleton injected on your class
              WebClient client = WebClient.create();
    
              return client.get()
                  .uri(event.getHeader().getCallbackEnpoint())
                  .accept(MediaType.APPLICATION_JSON)
                  .exchangeToFlux(response -> {
                    if (response.statusCode().equals(HttpStatus.OK)) {
                      System.out.println("Response is OK");
                      return response.bodyToFlux(SourceData.class);
                    }
                    return Flux.empty();
                  });
            })
            // Here is the trick 2 - supposing that transform return a Iterable of TargetData, then you should do this and will have Flux<TargetData>
            // and flatten instead of Flux<List<TargetData>>
            .flatMapIterable(source -> this.TransformImpl.transform(source));
    }