Search code examples
javamonoreactive-programmingspring-webfluxflux

Send/Update reactor Mono Object to different attributes of a Flux.concat()


I have a Flux.concat() operation which takes 3 checks like this:


public Mono<Boolean> checkSleep(Student std) 
{

Flux.concat(isHealthy(std), isWealthy(std, sleep), isWise(std, sleep))

.filter(result -> !result)
            .next()
            .flatMap(result -> Mono.just(false)) //returns false if any one is false
            .switchIfEmpty(Mono.just(true)); // returns true if all are true

}

each of this methods has a common external api call extService.getSleep(Student std) to get Sleep Object Mono<Sleep> for its flow.

I want to call extService.getSleep(Student std) only once in the entire flow,

ideally in the first check isHealthy(std) and pass the object Mono<Sleep> to the next 2 checks as well.

I am not understanding how to make this call as Flux.concat does not allow a Mono to be added in the prefix.

Each of the checks have similar body like this:

Mono<Boolean> isHealthy(Student std) 
  {
    
    return Mono.just(std)
                 .flatMap(std->extService.getSleep(std)).map(sleep-> sleep.isValid()); 
  }

in the next check I want to pass sleep object from previous method,

isWealthy(Student std, Sleep sleep) I do not want to call extService.getSleep(std)) once again,

I thought of creating a variable outside these 3 methods and update it when the api returns a something,

it throws error saying "Variable used in lambda expression should be final or effectively final"

Let me know if there is a better way to handle this scenario.

I am new to reactive spring programming, any help is appreciated.

thanks in advance.


Solution

  • Your line of thinking was not far off!

    Whenever you need to "reach outside" a publisher, consider using AtomicBoolean, AtomicInteger, etc. or the parameterized AtomicReference to get around the final or effectively final compiler warning. However, it should be noted that asynchronous operations like flatMap may not have the correct value when they get the wrapped values from these, so it's best to get around the problem in a different way.

    Fortunately, Reactor has a myriad of useful methods on its publishers!

    If I understand correctly, the checkSleep function should resolve to true if all three of isHealthy, isWealthy and isWise also resolve to true - false if even one of them resolve to false.

    I have created a simple simulation of this scenario:

    private Mono<Boolean> checkSleep(Student std) {
        return getSleep(std)
            .flatMapMany(sleep -> Flux.merge(isHealthy(std, sleep), isWealthy(std, sleep), isWise(std, sleep)))
            .all(result -> result);
    }
    
    private Mono<Sleep> getSleep(Student std) {
        return Mono.just(new Sleep(8));
    }
    
    private Mono<Boolean> isHealthy(Student std, Sleep sleep) {
        return Mono.just(true);
    }
    
    private Mono<Boolean> isWealthy(Student std, Sleep sleep) {
        return Mono.just(true);
    }
    
    private Mono<Boolean> isWise(Student std, Sleep sleep) {
        return Mono.just(true);
    }
    

    This way, getSleep is only called once, and is used to flat map the emitted value into the three booleans you're looking for. The Flux::all method then ensures that the returned Mono will wrap true only if all three inners have emitted true.

    Another thing to note is that I've replaced Flux::concat with Flux::merge. The former goes sequentially, subscribing, waiting for result, then repeat. These three publishers seem to be independent of one another, so replacing concat with merge allows all three to be subscribed to at the same time, thereby reducing time wasted with waiting.