Search code examples
javaconcurrencyreactive-programmingspring-webclient

I have a complicated set of tasks using Java Web-Client requests that need to run in parallel and finally block to return a single response


I am new to the Web Client reactive library.

Here is my problem :

It starts with a user submitting a request to post a packet of documents. They wait for a response.

A service consuming this request needs to run several tasks in parallel. Some of the sub-tasks within each task have to finish first ( 2 different Get requests ) before attempting the last sub-task which is the main Post request. Then I want to wait for the collection of all tasks 'Post sub-tasks' to finish (representing the packet), and then collect and reconcile the responses.

I need to reconcile at the end and make sure the entire parallel process succeeds (sending the packet) and then respond to a server (user) indicating that the process was successful or not.

My pseudo flow:

  • Create a set of documents to post to a server one at a time. A packet can contain up to 10 documents. (List of DocumentAndMetaData). Initially each document would contain some pre-filled known values like file path and document name.
    • For each document in a packet: (run in parallel)
      • I need to do file I/O and create a meta data object- call it getDocumentAndMetadata. To create a Metadata object I must do some steps first within getDocumentAndMetadata:
        • Do a get Request to get Key A- call it getKeyA(requestA)
        • Do a get request to get Key B- call it getKeyB(requestB)
        • Merge Key A and Key B requests and use the responses from those requests to update the metadata object.
      • Then Read File to get a Byte array - call it getFile
      • Then pass the byte array (document) and metadata object to a function that:
        • Does a Http Post to a server sending the byte array and metadata object in the post request.
        • Accumulate the responses for each Post which are strings.
        • then block until all the documents are sent.

Finally evaluate all the string responses that are returned from the Post requests and make sure the number of responses match the number of documents posted to a server. Track any errors. If any Get or Post request fails, log the error.

I figured out how to do all these steps running block() on each sub-task 'Get request' and then block() on the main 'Post request', but I am afraid the performance will suffer using this approach.

I need help with how to generate the flow using Web-Client and reactive non blocking parallel processes.

Thanks for any help.


Solution

  • ' I am afraid the performance will suffer using this approach.' - You are right. After all, the whole purpose of using WebFlux is to create a non-blocking application.

    I have tried to mock most of the logic. I hope you can correlate the solution with your use-case.

    @RestController
    public class MyController {
    
      @Autowired
      private WebClient webClient;
    
      @PostMapping(value = "/postPacketOfDocs")
      public Mono<ResponseEntity<String>> upload(@RequestBody Flux<String> documentAndMetaDataList) {
    
        return documentAndMetaDataList
            .flatMap(documentAndMetaData -> {
              //do File I/O
              return getDocumentAndMetadata(documentAndMetaData);
            })
            .map(String::getBytes) //Read File to get a Byte array
            .flatMap(fileBytes -> {
              return webClient.post().uri("/send/byte/and/metadata")
                  .retrieve().bodyToMono(String.class);
            })
            .collectList()
            .flatMap(allResponsesFromEachPOST -> {
              //Do some validation 
              boolean allValidationsSuccessful = true;
              if (allValidationsSuccessful) {
                return Mono.just("Success");
              } else {
                return Mono.error(new RuntimeException()); //some custom exception which can be handled by @ExceptionHandler
              }
            })
            .flatMap(msg -> Mono.just(ResponseEntity.ok().body(msg)));
    
      }
    
      private Mono<String> getDocumentAndMetadata(String documentAndMetaData) {
        String metadata = "";//get metadata object from documentAndMetaData
        Mono<String> keyAResponse = webClient.get().uri("/get/keyA").retrieve().bodyToMono(String.class);
        Mono<String> keyBResponse = webClient.get().uri("/get/keyB").retrieve().bodyToMono(String.class);
        return keyAResponse.concatWith(keyBResponse)
            .collectList()
            .flatMap(responses -> updateMetadata(responses, metadata));
      }
    
      private Mono<String> updateMetadata(List<String> responses, String metadata) {
        String newMedataData = metadata + responses.get(0) + responses.get(1); //some update logic
        return Mono.just(newMedataData);
      }
    }