Search code examples
javaproject-reactorspring2.x

Flux endpoint from infinite java stream


I have an issue while processing a flux that is built from a Stream.generate construct.

The Java stream is fetching some data from a remote source, hence I implemented a custom supplier that has the data fetching logic embedded, and then used it to populate the Stream.

Stream.generate(new SearchSupplier(...))

My idea is to detect an empty list and use the Java9 feature of takeWhile ->

Stream.generate(new SearchSupplier(this, queryBody))
            .takeWhile(either -> either.isRight() && either.get().nonEmpty())

(using Vavr's Either construct)

The repositoroy layer flux will then do:

return Flux.fromStream (
            this.searchStream(...) //this is where the stream gets generated
        )
        .map(Either::get)
        .flatMap(Flux::fromIterable);

The "service" layer is composed of some transformation steps on the flux, but the method signature is something like Flux<JsonObject> search(...).

Finally, the controller layer has a GetMapping:

@GetMapping(produces = "application/stream+json")
public Flux search(...) {
    return searchService.search(...) //this is the Flux<JsonObject> parth
         .subscriberContext(...) //stuff I need available during processing
         .doOnComplete(() -> log.debug("DONE")); 
}

My problem is that the Flux seems to never terminate. Doing a call from Postman for example just shot the 'Loading...' part in the response section. When I terminate the process from my IDE the results are then flushed to postman and I see what I'm expecting. Also the doOnComplete lambda never gets called

What I noticed is that if I change the source of a Flux:

Flux.fromArray(...) //harcoded array of lists of jsons

the doOnComplete lambda is called and also the http connection closes, and results are displayed in postman.

Any idea of what might be the issue?

Thanks.


Solution

  • You could create the Flux directly using code that looks like this. Note that I'm adding some assumed methods which you would need to implement based on your how your SearchSupplier works:

    Flux<SearchResultType> flux = Flux.generate(
                () -> new SearchSupplier(this, queryBody),
                (supplier, sink) -> {
                    SearchResultType current = supplier.next();
                    if (isNotLast(current)) {
                        sink.next(current);
                    } else {
                        sink.complete();
                    }
                    return supplier;
                },
                supplier -> anyCleanupOperations(supplier)
            );