Search code examples
scalatimeoutfuture

Scala future sequence and timeout handling


There are some good hints how to combine futures with timeouts. However I'm curious how to do this with Future sequence sequenceOfFutures

My first approach looks like this

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._

object FutureSequenceScala extends App {
  println("Creating futureList")

  val timeout = 2 seconds
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      Thread sleep ms
      ms toString
    }
    Future firstCompletedOf Seq(f, fallback(timeout))
  }

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  val results = Await result (waitingList, timeout * futures.size)
  println(results)

  def fallback(timeout: Duration) = future {
    Thread sleep (timeout toMillis)
    "-1"
  }
}

Is there a better way to handle timeouts in a sequence of futures or is this a valid solution?


Solution

  • There are a few things in your code here that you might want to reconsider. For starters, I'm not a huge fan of submitting tasks into the ExecutionContext that have the sole purpose of simulating a timeout and also have Thread.sleep used in them. The sleep call is blocking and you probably want to avoid having a task in the execution context that is purely blocking for the sake of waiting a fixed amount of time. I'm going to steal from my answer here and suggest that for pure timeout handling, you should use something like I outlined in that answer. The HashedWheelTimer is a highly efficient timer implementation that is mush better suited to timeout handling than a task that just sleeps.

    Now, if you go that route, the next change I would suggest concerns handling the individual timeout related failures for each future. If you want an individual failure to completely fail the aggregate Future returned from the sequence call, then do nothing extra. If you don't want that to happen, and instead want a timeout to return some default value instead, then you can use recover on the Future like this:

    withTimeout(someFuture).recover{
      case ex:TimeoutException => someDefaultValue
    }
    

    Once you've done that, you can take advantage of the non-blocking callbacks and do something like this:

    waitingList onComplete{
      case Success(results) => //handle success
      case Failure(ex) => //handle fail
    }
    

    Each future has a timeout and thus will not just run infinitely. There is no need IMO to block there and provide an additional layer of timeout handling via the atMost param to Await.result. But I guess this assumes you are okay with the non-blocking approach. If you really need to block there, then you should not be waiting timeout * futures.size amount of time. These futures are running in parallel; the timeout there should only need to be as long as the individual timeouts for the futures themselves (or just slightly longer to account for any delays in cpu/timing). It certainly should not be the timeout * the total number of futures.