Search code examples
springreactive-programmingspring-webfluxproject-reactorspring-webclient

Scatter & Gather using Spring Webclient


I am new to reactive programming concepts and trying to build one service that sends requests to a two backend service in parallel and combine those results. Those two backend service has a different response structure and i have created a mapper method to convert all that into a common Response structure.

This is what i have right now and it is working when both the services return results.

public Mono<List<Response>> getRecords(String input){

List<Response> response = new ArrayList<>();

Mono<FirstApiResponse> gResp = this.firstWebClient.get().uri(uriBuilder -> uriBuilder
            .path("/")
            .queryParam("q", input)
            .build()).retrieve()
            .bodyToMono(FirstApiResponse.class).log()
            .timeout(Duration.ofSeconds(50L));

Mono<SecondApiResponse> iResp = this.secondWebClient.get().uri(uriBuilder -> uriBuilder
        .path("/search")
        .queryParam("term", input)
        .build()).retrieve()
        .bodyToMono(SecondApiResponse.class).log().timeout(Duration.ofSeconds(50L));


return Mono.zip(firstResp,secResp).map(objects ->{
    if(firstResp != null)
    response.addAll(Mapper.convert(objects.getT1()));
    if(secResp != null);
    response.addAll(Mapper.convert(objects.getT2()));
    return response;
});

}

public  List<Response> convert(FirstApiResponse resp){
    ////
    Mapping to Response object 
    ////

    return response;
}

public  List<Response> convert(SecondApiResponse resp){
     ////
    Mapping to Response object 
    ////

    return response;
}

I don't know if this is the right way to do it. Moreover, i want to make it in such a way that if there is any errors from any of this service, then it should still return the results from the other service. Right now it throws the exception and I am not able to figure out how to handle it properly

How to handle these errors in a proper way ?


Solution

  • This is a pretty valid scenario and there are many ways to handle it. One crude way would be to use onErrorReturn a new Model which you can handle. It could be either an empty response or a wrapper around your model whichever seems fit for your scenario.

    Mono<Wrapper<FirstApiResponse>> gResp = this.firstWebClient.get().uri(uriBuilder -> uriBuilder
         .path("/")
         .queryParam("q", input)
         .build()).retrieve()
         .bodyToMono(FirstApiResponse.class).log()
         .map( response -> new Wrapper().withResponse(response))
         .timeout(Duration.ofSeconds(50L))
         .doOnError(throwable -> logger.error("Failed", throwable))
          .onErrorReturn(new Wrapper().withError( YourDefaultErrorReponse(...));
    
    Mono<SecondApiResponse> iResp = this.secondWebClient.get().uri(uriBuilder -> uriBuilder
        .path("/search")
        .queryParam("term", input)
        .build())
        .retrieve()      
        .bodyToMono(SecondApiResponse.class).log()
        .map( response -> new Wrapper().withResponse(response))
        .timeout(Duration.ofSeconds(50L))
        ..doOnError(throwable -> logger.error("Failed", throwable))
        .onErrorReturn(new Wrapper().withError( YourDefaultErrorReponse(...))
    

    Again there are ways to return a default response. A simple one would be to use something like a wrapper

    public final class Wrapper<T> {
      private T response ;
      private Error error;
          
      public Wrapper<T> withResponse ( T response ){
         this.response = response;
         return this;
      }
      public Wrapper<T> withError( Error error) {
         this.error = error;
         return this;
      }
    
      public Boolean hasError(){
        return error != null ;
      }
          
      public T getResponse(){
       return response;
      }
    }