I am new to the Web Client reactive library.
Here is my problem :
It starts with a user submitting a request to post a packet of documents. They wait for a response.
A service consuming this request needs to run several tasks in parallel. Some of the sub-tasks within each task have to finish first ( 2 different Get requests ) before attempting the last sub-task which is the main Post request. Then I want to wait for the collection of all tasks 'Post sub-tasks' to finish (representing the packet), and then collect and reconcile the responses.
I need to reconcile at the end and make sure the entire parallel process succeeds (sending the packet) and then respond to a server (user) indicating that the process was successful or not.
My pseudo flow:
Finally evaluate all the string responses that are returned from the Post requests and make sure the number of responses match the number of documents posted to a server. Track any errors. If any Get or Post request fails, log the error.
I figured out how to do all these steps running block() on each sub-task 'Get request' and then block() on the main 'Post request', but I am afraid the performance will suffer using this approach.
I need help with how to generate the flow using Web-Client and reactive non blocking parallel processes.
Thanks for any help.
' I am afraid the performance will suffer using this approach.' - You are right. After all, the whole purpose of using WebFlux is to create a non-blocking application.
I have tried to mock most of the logic. I hope you can correlate the solution with your use-case.
@RestController
public class MyController {
@Autowired
private WebClient webClient;
@PostMapping(value = "/postPacketOfDocs")
public Mono<ResponseEntity<String>> upload(@RequestBody Flux<String> documentAndMetaDataList) {
return documentAndMetaDataList
.flatMap(documentAndMetaData -> {
//do File I/O
return getDocumentAndMetadata(documentAndMetaData);
})
.map(String::getBytes) //Read File to get a Byte array
.flatMap(fileBytes -> {
return webClient.post().uri("/send/byte/and/metadata")
.retrieve().bodyToMono(String.class);
})
.collectList()
.flatMap(allResponsesFromEachPOST -> {
//Do some validation
boolean allValidationsSuccessful = true;
if (allValidationsSuccessful) {
return Mono.just("Success");
} else {
return Mono.error(new RuntimeException()); //some custom exception which can be handled by @ExceptionHandler
}
})
.flatMap(msg -> Mono.just(ResponseEntity.ok().body(msg)));
}
private Mono<String> getDocumentAndMetadata(String documentAndMetaData) {
String metadata = "";//get metadata object from documentAndMetaData
Mono<String> keyAResponse = webClient.get().uri("/get/keyA").retrieve().bodyToMono(String.class);
Mono<String> keyBResponse = webClient.get().uri("/get/keyB").retrieve().bodyToMono(String.class);
return keyAResponse.concatWith(keyBResponse)
.collectList()
.flatMap(responses -> updateMetadata(responses, metadata));
}
private Mono<String> updateMetadata(List<String> responses, String metadata) {
String newMedataData = metadata + responses.get(0) + responses.get(1); //some update logic
return Mono.just(newMedataData);
}
}