Search code examples
spring-webfluxrepeatproject-reactorflux

Repeatedly subscribe based on DB resource with .repeatWhen() in Spring WebFlux


I want to implement repetition of Flux process() based on the state of some resource in the DB. For example, if the array of elements in the resource is not empty, then repeat the process(). It looks like operator repeatWhen will suit my purpose - allow to subscribe to a publisher with resource. Here is a code snippet:

private Consumer<Signal<String>> processOnNewThread() {
    return signal -> {
        final var resourceId = signal.get();
        if (resourceId == null) return;

        this.process(resourceId)
            .repeatWhen(repeat -> Mono.defer(() -> repo.findById(resourceId)
                                                       // filter to end repeat
                                                       .filter(r -> !r.getElems().isEmpty())
                                                       // return Mono with complete signal to repeat
                                                       .map(r -> r.getElems().size())))
            .collectList()
            .contextWrite(stateSignal.getContextView())
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    };
}

private Flux<String> process(String resourceId) { ... } 

There are 2 problems in this code:

  1. repo.findById(resourceId) is executed before the process() method, despite the Mono.defer()
  2. When elems is empty, repeat sequence completes with empty signal which leads not to the end of the repetitions, but to the end of the whole process

Any ideas on how to check with a fresh resource and then continue or complete the repeats?


Solution

  • I managed to achieve the desired result by changing operator .repeatWhen as follows:

    .repeatWhen(repeat -> repeat.flatMap(r -> Mono.defer(() -> repo.findById(resourceId)
                                                                   .map(r -> r.getElems().size())))
                                .handle((nextRepeat, sink) -> {
                                    // if elem size > 0 - repeat process
                                    if (nextRepeat > 0) sink.next(nextRepeat);
                                    else sink.complete();
                                }))
    

    Using Flux repeat for the further chain allows the operator Mono.defer() will execute correctly and get a fresh resource at each repeat check. The handle() operator directly performs the prolongation or end of the resubscription. Thus, this solves the problems I am having