Search code examples
reactive-programmingspring-webfluxproject-reactorreactorreactive-streams

Webflux: OnErrorResume after repeats are exhausted is not being triggered


I am trying to execute the code after repeat exhaustion using onErrorResume but onErrorResume is not being trigger.

Here is the code sample

Mono.just(request)
                .filter(this::isConditionSatified)
                .map(aBoolean -> performSomeOperationIfConditionIsSatified(request))
                .repeatWhenEmpty(Repeat.onlyIf(i -> true)
                        .exponentialBackoff(Duration.ofSeconds(5)), Duration.ofSeconds(10))
                        .timeout(Duration.ofSeconds(30)))
                .delaySubscription(Duration.ofSeconds(10)))
                .onErrorResume(throwable -> {
                    log.warn("Max timeout reached", throwable);
                    return Mono.just(false);
                });

onErrorResume is never trigged. I am trying to use it as a fallback. My goal is if the repeat exhaustion is hit, return the false value.

My unit test complains of

expectation "expectNext(false)" failed (expected: onNext(false); actual: onComplete())

Any help or suggestion would be helpful.


Solution

  • since an empty source is valid by itself, repeatWhenEmpty doesn't necessarily propagate an exception after exhausting its attempts. The Repeat util from addons doesn't, even when the "timeout" triggers (as hinted in the timeout parameter's javadoc: "timeout after which no new repeats are initiated", ok that could be clearer).

    since you're using repeatWhenEMPTY, I'm guessing that the empty case is always "irrelevant" to you and thus defaultIfEmpty(false) should be the acceptable solution.