Search code examples
project-reactorreactor

Reactor. List of Monos, retry on fail


I have list List<Mono<String>>. Each Mono represents API call where I wait on I/O for result. The problem is that some times some calls return nothing (empty String), and I need repeat them again on that case.

Now it looks like this:

val firstAskForItemsRetrieved = firstAskForItems.map {
    it["statistic"] = (it["statistic"] as Mono<Map<Any, Any>>).block()
    it
}

I'm waiting for all Monos to finish, then in case of empty body I repeat request

val secondAskForItem = firstAskForItemsRetrieved
        .map {
            if ((it["statistic"] as Map<Any, Any>).isEmpty()) {
                // repeat request 
                it["statistic"] = getUserItem(userName) // return Mono
            } else
                it["statistic"] = Mono.just(it["statistic"])
            it
        }

And then block on each item again

val secondAskForItemsRetrieved = secondAskForItems.map {
    it["statistic"] = (it["statistic"] as Mono<Map<Any, Any>>).block()
    it
}

I see that looks ugly

  • Are any other ways to retry call in Mono if it fails, without doing it manually?

  • Is it block on each item a right way to get them all?

  • How to make the code better?

Thank you.


Solution

  • There are 2 operators I believe can help your:

    • For the "wait for all Mono" use case, have a look at the static methods when and zip.

      • when just cares about completion, so even if the monos are empty it will just signal an onComplete whenever all of the monos have finished. You don't get the data however.
      • zip cares about the values and expects all Monos to be valued. When all Monos are valued, it combines their values according to the passed Function. Otherwise it just completes empty.
    • To retry the empty Monos, have a look at repeatWhenEmpty. It resubscribes to an empty Mono, so if that Mono is "cold" it would restart the source (eg. make another HTTP request).