Search code examples
spring-bootspring-webfluxproject-reactor

How to extract content from Mono<List<T>> in WebFlux to pass it down the call chain


I want to be able to extract the List<Payload> from the Mono<List<Payload>> to pass it to a downstream service for processing (or maybe return from the read(RequestParams params) method, instead of it returning void):

@PostMapping("/subset")
    public void read(@RequestBody RequestParams params){
        Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
        
        ....
    }

where reader.read(...) is a method on an autowired Spring service utilizing a webClient to get the data from external web service API:

public Mono<List<Payload>> read(String date, String assetClasses, String firmAccounts, String id, String password) {
        Flux<Payload> nodes = client
                .get()
                .uri(uriBuilder -> uriBuilder
                    .path("/api/subset")
                    .queryParam("payloads", true)
                    .queryParam("date", date)
                    .queryParam("assetClasses", assetClasses)
                    .queryParam("firmAccounts", firmAccounts)
                    .build())
                .headers(header -> header.setBasicAuth("abc123", "XXXXXXX"))
        .retrieve()
        .onStatus(HttpStatus::is4xxClientError, response -> {
            System.out.println("4xx error");
            return Mono.error(new RuntimeException("4xx"));
        })
        .onStatus(HttpStatus::is5xxServerError, response -> {
            System.out.println("5xx error");
            return Mono.error(new RuntimeException("5xx"));
                })
        .bodyToFlux(Payload.class);
        
        Mono<List<Payload>> records = nodes
                .collectList();
        
        return records;
    }

Doing a blocking result.block() is not allowed in WebFlux and throws an exception:

new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread ..." ;

What is a proper way to extract the contents of a Mono in WebFlux? Is it some kind of a subscribe()? What would be the syntax?


Solution

  • While the following does return the value of the Mono observable in the logs:

    @PostMapping("/subset")
    @ResponseBody
    public  Mono<ResponseEntity<List<Payload>>> read1(@RequestBody RequestParams params){
            Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
                    
            return result
                     .map(e -> new ResponseEntity<List<PayloadByStandardBasis>>(e, HttpStatus.OK));
            
        }
    

    the understanding I was seeking was a proper way to compose a chain of calls, with WebFlux, whereby a response from one of its operators/legs (materialized as as a result from a webclient call, producing a set of records, as above) could be passed downstream to another operator/leg to facilitate a side effect of saving those records in a DB, or something to that effect.

    It would probably be a good idea to model each of those steps as a separate REST endpoint, and then have another endpoint for a composition operation which internally calls each independent endpoint in the right order, or would other design choices be more preferred?

    That is ultimately the understanding I was looking for, so if anyone wants to share an example code as well as opinions to better implement the set of steps described above, I'm willing to accept the most comprehensive answer.

    Thank you.