Search code examples
spring-webfluxwebclientproject-reactor

Chaining Reactive Asynchronus calls in spring


I’m very new to the SpringReactor project. Until now I've only used Mono from WebClient .bodyToMono() steps, and mostly block() those Mono's or .zip() multiple of them.

But this time I have a usecase where I need to asynchronously call methods in multiple service classes, and those multiple service classes are calling multiple backend api.

I understand Project Reactor doesn't provide asynchronous flow by default.
But we can make the publishing and/or subscribing on different thread and make code asynchronous And that's what I am trying to do. I tried to read the documentation here reactor reference but still not clear.

For the purpose of this question, I’m making up this imaginary scenario. that is a little closer to my use case.

Let's assume we need to get a search response from google for some texts searched under images.

Example Scenario

Let's have an endpoint in a Controller This endpoint accepts the following object from request body

MultimediaSearchRequest{
   Set<String> searchTexts; //many texts. 
   boolean isAddContent;
   boolean isAddMetadata;
}

in the controller, I’ll break the above single request object into multiple objects of the below type. MultimediaSingleSearchRequest{

       String searchText;
       boolean isAddContent;
       boolean isAddMetadata;
}

This Controller talks to 3 Service classes.
Each of the service classes has a method searchSingleItem.
Each service class uses a few different backend Apis, but finally combines the results of those APIs responses into the same type of response class, let's call it MultimediaSearchResult.

class JpegSearchHandleService {  
  public MultimediaSearchResult searchSingleItem
              (MultimediaSingleSearchRequest req){

    return comboneAllImageData( 
          getNameApi(req),
          getImageUrlApi(req),
          getContentApi(req)  //dont call if req.isAddContent false 
        )
  }
} 
 
class GifSearchHandleService { 
  public  MultimediaSearchResult searchSingleItem
               (MultimediaSingleSearchRequest req){

    return comboneAllImageData( 
          getNameApi(req),
          gitPartApi(req),
          someRandomApi(req),
          soemOtherRandomApi(req) 
        )
  }
}

class VideoSearchHandleService {  
  public MultimediaSearchResult searchSingleItem
              (MultimediaSingleSearchRequest req){

    return comboneAllImageData( 
          getNameApi(req),
          codecApi(req),
          commentsApi(req),
          anotherApi(req) 
        )
  }

}

In the end, my controller returns the response as a List of MultimediaSearchResult

Class MultimediaSearchResponse{

 List< MultimediaSearchResult> results;

}

If I want to use this all asynchronously using the project reactor. how to achieve it.

Like calling searchSingleItem method in each service for each searchText asynchronously. Even within the services call each backend API asynchronously (I’m already using WebClient and converting response bodyToMono for backend API calls)


Solution

  • First, I will outline a solution for the upper "layer" of your scenario.

    The code (a simple simulation of the scenario):

    public class ChainingAsyncCallsInSpring {
    
        public Mono<MultimediaSearchResponse> controllerEndpoint(MultimediaSearchRequest req) {
            return Flux.fromIterable(req.getSearchTexts())
                .map(searchText -> new MultimediaSingleSearchRequest(searchText, req.isAddContent(), req.isAddMetadata()))
                .flatMap(multimediaSingleSearchRequest -> Flux.merge(
                    classOneSearchSingleItem(multimediaSingleSearchRequest),
                    classTwoSearchSingleItem(multimediaSingleSearchRequest),
                    classThreeSearchSingleItem(multimediaSingleSearchRequest)
                ))
                .collectList()
                .map(MultimediaSearchResponse::new);
        }
    
        private Mono<MultimediaSearchResult> classOneSearchSingleItem(MultimediaSingleSearchRequest req) {
            return Mono.just(new MultimediaSearchResult("1"));
        }
    
        private Mono<MultimediaSearchResult> classTwoSearchSingleItem(MultimediaSingleSearchRequest req) {
            return Mono.just(new MultimediaSearchResult("2"));
        }
    
        private Mono<MultimediaSearchResult> classThreeSearchSingleItem(MultimediaSingleSearchRequest req) {
            return Mono.just(new MultimediaSearchResult("3"));
        }
    }
    

    Now, some rationale.

    In the controllerEndpoint() function, first we create a Flux that will emit every single searchText from the request. We map these to MultimediaSingleSearchRequest objects, so that the services can consume them with the additional metadata that was provided with the original request.

    Then, Flux::flatMap the created MultimediaSingleSearchRequest objects into a merged Flux, which (as opposed to Flux::concat) ensures that all three publishers are subscribed to eagerly i.e. they don't wait for one another. It works best on this exact scenario, when several independent publishers need to be subscribed to at the same time and their order is not important.

    After the flat map, at this point, we have a Flux<MultimediaSearchResult>.

    We continue with Flux::collectList, thus collecting the emitted values from all publishers (we could also use Flux::reduceWith here).

    As a result, we now have a Mono<List<MultimediaSearchResult>>, which can easily be mapped to a Mono<MultimediaSearchResponse>.

    The results list of the MultimediaSearchResponse will have 3 items for each searchText in the original request.

    Hope this was helpful!


    Edit

    Extending the answer with a point of view from the service classes as well. Assuming that each inner (optionally skipped) call returns a different type of result, this would be one way of going about it:

    public class MultimediaSearchResult {
        private Details details;
        private ContentDetails content;
        private MetadataDetails metadata;
    }
    
    public Mono<MultimediaSearchResult> classOneSearchSingleItem(MultimediaSingleSearchRequest req) {
        return Mono.zip(getSomeDetails(req), getContentDetails(req), getMetadataDetails(req))
            .map(tuple3 -> new MultimediaSearchResult(
                    tuple3.getT1(),
                    tuple3.getT2().orElse(null),
                    tuple3.getT3().orElse(null)
                )
            );
    }
    
    // Always wanted
    private Mono<Details> getSomeDetails(MultimediaSingleSearchRequest req) {
        return Mono.just(new Details("details")); // api call etc.
    }
    
    // Wanted if isAddContent is true
    private Mono<Optional<ContentDetails>> getContentDetails(MultimediaSingleSearchRequest req) {
        return req.isAddContent()
            ? Mono.just(Optional.of(new ContentDetails("content-details"))) // api call etc.
            : Mono.just(Optional.empty());
    }
    
    // Wanted if isAddMetadata is true
    private Mono<Optional<MetadataDetails>> getMetadataDetails(MultimediaSingleSearchRequest req) {
        return req.isAddMetadata()
            ? Mono.just(Optional.of(new MetadataDetails("metadata-details"))) // api call etc.
            : Mono.just(Optional.empty());
    }
    

    Optionals are used for the requests that might be skipped, since Mono::zip will fail if either of the zipped publishers emit an empty value.

    If the results of each inner call extend the same base class or are the same wrapped return type, then the original answer applies as to how they can be combined (Flux::merge etc.)