Search code examples
javaspring-webfluxproject-reactorreactor

How to create a Mono triggered by completion of a flux


I have a class that populates a Map from a Flux source. I want to able to updateTheData() on the fly, but I want getTheData() to be able to return a Mono that resolves to current or pending data.

Basically if theData != null and no in progress flux, return Mono.just(theData) Otherwise return mono that will eventually emit theData.

Edit: This is the best I can do so far

    class SomeClass {
        private Mono<Map<String, Data>> theData;
        private final SomeFluxService someFluxService = new SomeFluxService();

        public SomeClass() {
            updateTheData();
        }

        public void updateTheData() {
            someFluxService.get()
                .collectMap(Data::getId, Function.identity())
                .subscribe(d -> this.theData = Mono.just(d));
        }

        public Mono<Map<String, Data>> getTheData() {
            return this.theData;
        }
    }

But there is still the problem of before updateTheData() completes for the first time, getTheData() will return null

Help appreciated!


Solution

  • But there is still the problem of before updateTheData() completes for the first time, getTheData() will return null

    This is because you're using a consumer in your subscribe method to update a Mono only when data is emitted. This is a bit of an odd approach, it would be better to simply cache() the Mono and then assign it to your field straight away:

    public void updateTheData() {
        theData = someFluxService.get()
                .collectMap(Data::getId, Function.identity())
                .cache();
    }
    

    Taking that thinking a step further, if your requirement is to update the data every x seconds rather than inherently on demand, you can just pass that into the cache function and do away with the separate method entirely:

    public NewClass() {
        theData = someFluxService.get()
                .collectMap(Data::getId, Function.identity())
                .cache(Duration.ofMinutes(5));
    }
    
    public Mono<Map<String, Data>> getTheData() {
        return theData;
    }