Search code examples
javareactive-programmingproject-reactorflux

Call a method right before start consuming Flux elements


How to call a non-reactive blocking method in the API pipeline, but call it only if there is at least one item present in the Flux?

For example, before calling itemsRepo.saveItem, I want to call setupRepo, but only if at least one item is emitted from the Flux returned by externalApi.getAllItems().

externalApi
    .getAllItems()
    .flatMap(item -> 
          itemsRepo
             .saveItem(item)
             .doOnNext(e -> this.logSaveResult(item, e))
             .doOnError(th -> this.logSaveError(item, th))
    )
    .subscribe();

Here are the relevant method signatures

Flux<Item> getAllItems();
Mono<Boolean> saveItem(Item item);
// The following are non reactive 
private void logSaveResult(Item item, Boolean isSuccess);
private void logSaveError(Item item, Throwable th);
void setupRepo();

Here is the complete code if you want to test it, but this repo has a little bit more than related to this question. https://github.com/mps-learning/cassandra-save


Solution

  • You can use switchOnFirst like:

    externalApi
        .getAllItems()
        .switchOnFirst( (signal, flux) ->
           if (signal.hasValue()) {
              setupRepo();
           }
           return flux;
        )
        .flatMap(item -> 
              itemsRepo
                 .saveItem(item)
                 .doOnNext(e -> this.logSaveResult(item, e))
                 .doOnError(th -> this.logSaveError(item, th))
        )
        .subscribe();