Search code examples
scalafuturesequentialfor-comprehensionfoldleft

Scala - Execute arbitrary number of Futures sequentially but dependently


I'm trying to figure out the neatest way to execute a series of Futures in sequence, where one Future's execution depends on the previous. I'm trying to do this for an arbitrary number of futures.

User case:

  • I have retrieved a number of Ids from my database.
  • I now need to retrieve some related data on a web service.
  • I want to stop once I've found a valid result.
  • I only care about the result that succeeded.

Executing these all in parallel and then parsing the collection of results returned isn't an option. I have to do one request at a time, and only execute the next request if the previous request returned no results.

The current solution is along these lines. Using foldLeft to execute the requests and then only evaluating the next future if the previous future meets some condition.

def dblFuture(i: Int) = { i * 2 }
val list = List(1,2,3,4,5)
val future = list.foldLeft(Future(0)) {
  (previousFuture, next) => {
    for {
      previousResult <- previousFuture
      nextFuture <- { if (previousResult <= 4) dblFuture(next) else previousFuture }
    } yield (nextFuture)
  }
}

The big downside of this is a) I keep processing all items even once i've got a result i'm happy with and b) once I've found the result I'm after, I keep evaluating the predicate. In this case it's a simple if, but in reality it could be more complicated.

I feel like I'm missing a far more elegant solution to this.


Solution

  • Looking at your example, it seems as though the previous result has no bearing on subsequent results, and instead what only matters is that the previous result satisfies some condition to prevent the next result from being computed. If that is the case, here is a recursive solution using filter and recoverWith.

    def untilFirstSuccess[A, B](f: A => Future[B])(condition: B => Boolean)(list: List[A]): Future[B] = {
        list match {
            case head :: tail => f(head).filter(condition).recoverWith { case _: Throwable => untilFirstSuccess(f)(condition)(tail) }
            case Nil => Future.failed(new Exception("All failed.."))
        }
     }
    

    filter will only be called when the Future has completed, and recoverWith will only be called if the Future has failed.

    def dblFuture(i: Int): Future[Int] = Future { 
         println("Executing.. " + i)
         i * 2 
     }
    
    val list = List(1, 2, 3, 4, 5)
    
    scala> untilFirstSuccess(dblFuture)(_ > 6)(list)
    Executing.. 1
    Executing.. 2
    Executing.. 3
    Executing.. 4
    res1: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@514f4e98
    
    scala> res1.value
    res2: Option[scala.util.Try[Int]] = Some(Success(8))