I want to implement repetition of Flux process() based on the state of some resource in the DB. For example, if the array of elements in the resource is not empty, then repeat the process(). It looks like operator repeatWhen will suit my purpose - allow to subscribe to a publisher with resource. Here is a code snippet:
private Consumer<Signal<String>> processOnNewThread() {
return signal -> {
final var resourceId = signal.get();
if (resourceId == null) return;
this.process(resourceId)
.repeatWhen(repeat -> Mono.defer(() -> repo.findById(resourceId)
// filter to end repeat
.filter(r -> !r.getElems().isEmpty())
// return Mono with complete signal to repeat
.map(r -> r.getElems().size())))
.collectList()
.contextWrite(stateSignal.getContextView())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
};
}
private Flux<String> process(String resourceId) { ... }
There are 2 problems in this code:
Any ideas on how to check with a fresh resource and then continue or complete the repeats?
I managed to achieve the desired result by changing operator .repeatWhen as follows:
.repeatWhen(repeat -> repeat.flatMap(r -> Mono.defer(() -> repo.findById(resourceId)
.map(r -> r.getElems().size())))
.handle((nextRepeat, sink) -> {
// if elem size > 0 - repeat process
if (nextRepeat > 0) sink.next(nextRepeat);
else sink.complete();
}))
Using Flux repeat for the further chain allows the operator Mono.defer() will execute correctly and get a fresh resource at each repeat check. The handle() operator directly performs the prolongation or end of the resubscription. Thus, this solves the problems I am having