Search code examples
javaasynchronousproject-reactorflatmap

how to map flux stopping if async condition is satisfied?


consider i have a flux of integers and this method simulating an async external api data retrieval, which can return an empty response for some specific unknown input:

public static Mono<String> getApiData(int i) {
    if (i == 3) return Mono.empty(); // i'm using 3 just as an example
    return Mono.just(String.valueOf(i * 2));
}

and these methods to be executed with getApiData output, depending on the result:

// when getApiData returns non empty mono
public static Mono<Boolean> updateDatabaseWithApiData(int apiInput, String apiOutput) {
    System.out.println(apiInput + " -> " + apiOutput);
    // lots of unrelated logic
    return Mono.just(true);
}

// when getApiData returns empty mono
public static Mono<Boolean> logFailure(int apiInput) {
    System.out.println(apiInput + " -> failure");
    // registering errors logs
    return Mono.just(false);
}

with this i would like to write a method like this Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux) that applies getApiData for each element and stops when a failure occurs. then if at least a single element reaches updateDatabaseWithApiData, return Mono.just(true), Mono.just(false) otherwise.

so i would get this output:

public static void main(String[] args) {
    Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
    processFluxUntilFailure(flux).subscribe(value -> System.out.println("result " + value));
}

desired output:

1 -> 2
2 -> 4
3 -> failure
result true

since we had (at least 1) 2 successful elements processed.

considering:

  • this is a simplified version of my real problem
  • i can't predict when data will be empty from getApiData
  • i can't change the described methods, only processFluxUntilFailure

i tried this:

public static Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux) {
    return flux.flatMap(apiInput -> getApiData(apiInput)
                    .flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput))
                    .switchIfEmpty(Mono.defer(() -> logFailure(apiInput)))
            )
            .reduce((b1, b2) -> b1 || b2);
}

which resulted in

1 -> 2
2 -> 4
3 -> failure
4 -> 8
5 -> 10
result true

how can i achieve my desired output from this try? in other words, how can i "stop" flatMap if some async condition is satisfied?


Solution

  • You can use takeWhile for this use-case.

    takeWhile short your flow execution upon receiving an "invalid value", i.e once it fails a certain test/predicate. As it does not include the failed element in the output (contrary to takeUntil), it should match your requirement.

    So, your function could be expressed like that:

    public static Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux) {
        return flux.flatMap(apiInput -> getApiData(apiInput)
                        .flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput))
                        .switchIfEmpty(Mono.defer(() -> logFailure(apiInput)))
                )
                .takeWhile(Boolean.TRUE::equals)
                .last(false);
    }