Search code examples
multithreadingscalafutureblocking

Futures that flatMap a list of futures are doing a blocking operation, so why not enclose the code in blocking {..}?


In working through some of the exercises and videos from the Coursera class on Reactive Programming I saw this definition of a method to 'sequence' a List of futures. The method returns a Future which will do the work of waiting on all the futures in fts (see code below) and packaging those results in a List[T] that will be available when the Future[List[T]] returned by sequence is complete.

def sequence[T](fts: List[Future[T]]): Future[List[T]] = {
    fts match {
        case Nil => Future(Nil)
        case (ft::fts) => ft.flatMap(t => sequence(fts)
            .flatMap(ts => Future(t::ts)))
    }
}

This code was given by the instructor, so I'm guessing that it should represent the optimal pattern for how to do this kind of thing. However, elsewhere in the lectures the instructors state:

Whenever you have a long-running computation or blocking make sure to run it inside the blocking construct. For example:

    blocking {
      Thread.sleep(1000)
    } 

is used to designate a piece of code as potentially blocking. An asynchronous computation that has a blocking construct is typically scheduled in a separate thread to avoid potential deadlock. Example: let's say you have a future f that waits on a timer or for a resource or a monitor condition that can only be fulfilled by some other future g. In that case, the part of the code in f that does the waiting should be wrapped in the blocking, otherwise the future g might never be run.

Now... what I don't understand is why the 'match' expression was not wrapped in a 'blocking' expression. Don't we expect all the flatMapping to take (potentially) a noticeable chunk of time ?

Side Note: There is an 'official' sequence method in the scala.concurrent.Future class and that implementation also does not use blocking.

I will also post this to the Coursera forums and if I get an answer back, I will post here as well.


Solution

  • Don't we expect all the flatMapping to take (potentially) a noticeable chunk of time?

    Nope. flatMap just constructs a new Future and returns immediately. It doesn't block.

    See the default implementation of flatMap. Here's a simplified version of it:

    trait Future[+T] {
    
      def flatMap[S](f: T => Future[S])
                    (implicit executor: ExecutionContext): Future[S] = {
    
        val promise = new Promise[S]()
    
        this.onComplete {
    
          // The first Future (this) failed
          case Failure(t) => promise.failure(t)
    
          case Success(v1) =>
    
            // Apply the flatMap function (f) to the first Future's result
            Try(f(v1)) match {
    
              // The flatMap function (f) threw an exception
              case Failure(t) => promise.failure(t)
    
              case Success(future2) =>
                future2.onComplete {
    
                  // The second Future failed
                  case Failure(t) => promise.failure(t)
    
                  // Both futures succeeded - Complete the promise
                  // successfully with the second Future's result.
                  case Success(v2) => promise.success(v2)
                }
            }
        }
        promise.future
      }
    }
    

    Overview of what happens when you call flatMap:

    1. Create a promise
    2. Add a callback to this future
    3. Return the promise

    The method returns a Future which will do the work of waiting on all the futures

    I think that description is somewhat misleading. The Future that you get back from Future.sequence doesn't really "do work". As you can see in the code above, the Future you get from flatMap (and thus the Future you get from Future.sequence) is just a promise that will eventually be completed by something else. The only thing that ever really does anything is the ExecutionContext; the Futures just specify what to do.