Search code examples
spring-bootspring-webfluxproject-reactorreactor

Webflux Reactor - Checking if all items in the original Flux were successful


i currently have this Reactor code where im not sure im doing this the idiomatic way.

My requirements are that for a list of accountIds, I make 2 requests which are done one after the other. One to delete the account data, the other is to trigger an event afterwards. The second request is only made if the first one succeeds.

At the end, i would like to know if all of the sets of requests were successful. I have achieved this with the code below.

Flux.fromIterable(List.of("accountId", "someOtherAccountId"))
            .flatMap(accountId -> someWebclient.deleteAccountData(accountId)
                .doOnSuccess(response -> log.info("Delete account data success"))
                .onErrorResume(e -> {
                    log.info("Delete account data failure");
                    return Mono.empty();
                })
                .flatMap(deleteAccountDataResponse -> {
                    return eventServiceClient.triggerEvent("deleteAccountEvent")
                        .doOnSuccess(response -> log.info("Delete account event success"))
                        .onErrorResume(e -> {
                            log.info("Delete account event failure");
                            return Mono.empty();
                        });
                }))
            .count()
            .subscribe(items -> {
                if (items.intValue() == accountIdsToForget.size()) {
                    log.info("All accountIds deleted and events triggered successfully");
                } else {
                    log.info("Not all accoundIds deleted and events triggered successfully");
                }
            });

Is there a better way to achieve this?

As the webclients can return errors for 4xx and 5xx, i am having to swallow that up with onErrorResume in order to prevent the error from bubbling up. Similarly, the only way i have been able to capture if all of the accountIds have been processed is by checking the size of the Flux against the size of the List which it was started with


Solution

  • Disclaimer: it is a little subjective how to provide a better solution. In this answer, I will provide my personal choice of error handling, that, in my opinion, provides best extensibility and readability.

    I would model a result/report object (kind like Either in functional paradigm), so that each success or error is sent as a "next signal" downstream.

    It requires a little more code/boilerplate, but the benefit is that we end up with a flow of successes and failures produced on the fly. It allows to detect errors early, and ease both error recovery and pipeline extensibility (for example, it is then very easy to switch between fail-fast and error silencing strategies, or to build complex reports from upstream results, etc.).

    Let's try to apply this to your example. For simplicity, I will mock deletion and notification service with two methods that return an empty result on success:

    static Mono<Void> delete(String account) {
        if (account.isBlank()) return Mono.error(new IllegalArgumentException("EMPTY ACCOUNT !"));
        else return Mono.empty();
    }
    
    static Mono<Void> notify(String event) {
        if (event.isBlank()) return Mono.error(new IllegalArgumentException("UNKNOWN EVENT !"));
        return Mono.empty();
    }
    

    I would make this steps:

    1. Create result model:
      sealed interface Result { String accountId(); }
      sealed interface Error extends Result { Throwable cause(); }
      record DeletionError(String accountId, Throwable cause) implements Error {}
      record NotifyError(String accountId, Throwable cause) implements Error {}
      record Success(String accountId) implements Result {}
      
    2. Then, we can prepare our pipeline that will wrap our delete and notify operations to make them produce result objects:
      static Flux<Result> deleteAndNotify(Flux<String> accounts) {
          Function<String, Mono<Result>> safeDelete = account
                  -> delete(account)
                  .<Result>thenReturn(new Success(account))
                  .onErrorResume(err -> Mono.just(new DeletionError(account, err)));
      
          Function<Result, Mono<Result>> safeNotify = deletionResult -> deletionResult instanceof Success
                  ? notify("deleteAccountEvent")
                      .thenReturn(deletionResult)
                      .onErrorResume(err -> Mono.just(new NotifyError(deletionResult.accountId(), err)))
                  : Mono.just(deletionResult);
      
          return accounts.flatMap(safeDelete)
                         .flatMap(safeNotify);
      }
      

    With the code above, you can already receive errors as they arrive. A simple program:

    var results = deleteAndNotify(Flux.just("a1", "a2", "  ", "a3"));
    results.subscribe(System.out::println);    
    

    prints:

    Success[accountId=a1]
    Success[accountId=a2]
    DeletionError[accountId=  , cause=java.lang.IllegalArgumentException: EMPTY ACCOUNT !]
    Success[accountId=a3]
    

    Now, it becomes very simple to adapt your flow of control:

    • if we want to keep track of errors only, we just have to chain a simple filter: results.filter(it -> it instanceof Error)
    • To fail-fast, just map error result to a real error: results.flatMap(result -> result instanceof Error err ? Mono.error(err.cause()) : Mono.just(result))
    • You want to get an idea of the flow throughput ? Just time it: results.timed()
    • etc.

    And if you want to count, you can now directly count errors and successes on the fly. It provides a few advantages:

    • You are not forced to know the number of accounts to delete in advance to verify if any error happened
    • You can have a live monitoring of the failed/succeeded operations

    We can program counting like that:

    record Count(long success, long deleteFailed, long notifyFailed) {
        Count() { this(0, 0, 0); }
    
        Count newSuccess() { return new Count(success + 1, deleteFailed, notifyFailed); }
        Count newDeletionFailure() { return new Count(success, deleteFailed + 1, notifyFailed); }
        Count newNotifyFailure() { return new Count(success, deleteFailed, notifyFailed + 1); }
    }
    
    var counting = results.scanWith(Count::new, (count, result) -> switch (result) {
        case Success s -> count.newSuccess();
        case DeletionError de -> count.newDeletionFailure();
        case NotifyError ne -> count.newNotifyFailure();
    });
    

    Subscribing to this counting flow using the same input accounts as above would produce that kind of input:

    Count[success=0, deleteFailed=0, notifyFailed=0]
    Count[success=1, deleteFailed=0, notifyFailed=0]
    Count[success=2, deleteFailed=0, notifyFailed=0]
    Count[success=2, deleteFailed=1, notifyFailed=0]
    Count[success=3, deleteFailed=1, notifyFailed=0]
    

    If you want only a total count, then either use counting.last() or replace scanWith by reduceWith operator.

    I hope this answer is of any help to you to better model pipelines/DAG/flows of operations.