Search code examples
spring-bootreactive-programmingspring-webfluxopenapiopenapi-generator

Start processing Flux response from server before completion: is it possible?


I have 2 Spring-Boot-Reactive apps, one server and one client; the client calls the server like so:

Flux<Thing> things = thingsApi.listThings(5);

And I want to have this as a list for later use:

// "extractContent" operation takes 1.5s per "thing"
List<String> thingsContent = things.map(ThingConverter::extractContent)
        .collect(Collectors.toList())
        .block()

On the server side, the endpoint definition looks like this:

@Override
public Mono<ResponseEntity<Flux<Thing>>> listThings(
        @NotNull @Valid @RequestParam(value = "nbThings") Integer nbThings,
        ServerWebExchange exchange
) {
    // "getThings" operation takes 1.5s per "thing"
    Flux<Thing> things = thingsService.getThings(nbThings);
    return Mono.just(new ResponseEntity<>(things, HttpStatus.OK));
}

The signature comes from the Open-API generated code (Spring-Boot server, reactive mode).


What I observe: the client jumps to things.map immediately but only starts processing the Flux after the server has finished sending all the "things".

What I would like: the server should send the "things" as they are generated so that the client can start processing them as they arrive, effectively halving the processing time.

Is there a way to achieve this? I've found many tutorials online for the server part, but none with a java client. I've heard of server-sent events, but can my goal be achieved using a "classic" Open-API endpoint definition that returns a Flux?

The problem seemed too complex to fit a minimal viable example in the question body; full code available for reference on Github.

EDIT: redirect link to main branch after merge of the proposed solution


Solution

  • I've got it running by changing 2 points:

    • First: I've changed the content type of the response of your /things endpoint, to:
    content:
      text/event-stream
    

    Don't forget to change also the default response, else the client will expect the type application/json and will wait for the whole response.

    • Second point: I've changed the return of ThingsService.getThings to this.getThingsFromExistingStream (the method you comment out)

    I pushed my changes to a new branch fix-flux-response on your Github, so you can test them directly.