consider i have a flux of integers and this method simulating an async external api data retrieval, which can return an empty response for some specific unknown input:
public static Mono<String> getApiData(int i) {
if (i == 3) return Mono.empty(); // i'm using 3 just as an example
return Mono.just(String.valueOf(i * 2));
}
and these methods to be executed with getApiData
output, depending on the result:
// when getApiData returns non empty mono
public static Mono<Boolean> updateDatabaseWithApiData(int apiInput, String apiOutput) {
System.out.println(apiInput + " -> " + apiOutput);
// lots of unrelated logic
return Mono.just(true);
}
// when getApiData returns empty mono
public static Mono<Boolean> logFailure(int apiInput) {
System.out.println(apiInput + " -> failure");
// registering errors logs
return Mono.just(false);
}
with this i would like to write a method like this Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux)
that applies getApiData
for each element and stops when a failure occurs. then if at least a single element reaches updateDatabaseWithApiData
, return Mono.just(true)
, Mono.just(false)
otherwise.
so i would get this output:
public static void main(String[] args) {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
processFluxUntilFailure(flux).subscribe(value -> System.out.println("result " + value));
}
desired output:
1 -> 2
2 -> 4
3 -> failure
result true
since we had (at least 1) 2 successful elements processed.
considering:
getApiData
processFluxUntilFailure
i tried this:
public static Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux) {
return flux.flatMap(apiInput -> getApiData(apiInput)
.flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput))
.switchIfEmpty(Mono.defer(() -> logFailure(apiInput)))
)
.reduce((b1, b2) -> b1 || b2);
}
which resulted in
1 -> 2
2 -> 4
3 -> failure
4 -> 8
5 -> 10
result true
how can i achieve my desired output from this try? in other words, how can i "stop" flatMap
if some async condition is satisfied?
You can use takeWhile for this use-case.
takeWhile short your flow execution upon receiving an "invalid value", i.e once it fails a certain test/predicate. As it does not include the failed element in the output (contrary to takeUntil), it should match your requirement.
So, your function could be expressed like that:
public static Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux) {
return flux.flatMap(apiInput -> getApiData(apiInput)
.flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput))
.switchIfEmpty(Mono.defer(() -> logFailure(apiInput)))
)
.takeWhile(Boolean.TRUE::equals)
.last(false);
}