How to call a non-reactive blocking method in the API pipeline, but call it only if there is at least one item present in the Flux?
For example, before calling itemsRepo.saveItem
, I want to call setupRepo
, but only if at least one item is emitted from the Flux returned by externalApi.getAllItems()
.
externalApi
.getAllItems()
.flatMap(item ->
itemsRepo
.saveItem(item)
.doOnNext(e -> this.logSaveResult(item, e))
.doOnError(th -> this.logSaveError(item, th))
)
.subscribe();
Here are the relevant method signatures
Flux<Item> getAllItems();
Mono<Boolean> saveItem(Item item);
// The following are non reactive
private void logSaveResult(Item item, Boolean isSuccess);
private void logSaveError(Item item, Throwable th);
void setupRepo();
Here is the complete code if you want to test it, but this repo has a little bit more than related to this question. https://github.com/mps-learning/cassandra-save
You can use switchOnFirst
like:
externalApi
.getAllItems()
.switchOnFirst( (signal, flux) ->
if (signal.hasValue()) {
setupRepo();
}
return flux;
)
.flatMap(item ->
itemsRepo
.saveItem(item)
.doOnNext(e -> this.logSaveResult(item, e))
.doOnError(th -> this.logSaveError(item, th))
)
.subscribe();