Search code examples
scalafuturefor-comprehension

For comprehension: how to run Futures sequentially


Given the following methods...

def doSomething1: Future[Int] = { ... }
def doSomething2: Future[Int] = { ... }
def doSomething3: Future[Int] = { ... }

... and the following for-comprehension:

for {
  x <- doSomething1
  y <- doSomething2
  z <- doSomething3
} yield x + y + z

The three methods run in parallel, but in my case doSomething2 MUST run after doSomething1 has finished. How do I run the three methods in sequence?

EDIT

As suggested by Philosophus42, here below is a possible implementation of doSomething1:

def doSomething1: Future[Int] = {
  // query the database for customers younger than 40;
  // `find` returns a `Future` containing the number of matches
  customerService.find(Json.obj("age" -> Json.obj("$lt" -> 40)))
}

... so the Future is created by an internal call to another method.

EDIT 2

Perhaps I simplified the use case too much... and I'm sorry. Let's try again and go closer to the real use-case. Here are the three methods:

for {
  // get all the transactions generated by the exchange service
  transactions <- exchange.orderTransactions(orderId)

  //for each transaction create a log
  logs <- Future.sequence(tansactions.map { transaction =>
    for {
      // update trading order status
      _ <- orderService.findAndUpdate(transaction.orderId, "Executed")

      // create new log
      log <- logService.insert(Log(
        transactionId => transaction.id,
        orderId => transaction.orderId,
        ...
      ))
    } yield log
  })
} yield logs

What I'm trying to do is to create a log for each transaction associated with an order. logService.insert gets invoked many times even if transactions just contains one entry.


Solution

  • Comment on your post

    First, how does the code inside doSomethingX look like? Even more irrated, that with your given code, the futures run parallel.

    Answer

    In order to make the Future execution sequential, just use

    for {
      v1 <- Future { ..block1... } 
      v2 <- Future { ..block2... } 
    } yield combine(v1, v2)
    

    The reason this works, is that the statement Future { ..body.. } starts asynchronous computation, at that point in time the statement is evaluated.

    With the above for-comprehension desugared

    Future { ..block1.. }
      .flatMap( v1 => 
         Future { ..block>.. }
           .map( v2 => combine(v1,v2) )
      )
    

    it is obvious, that

    • if Future{ ...block1... } has it's result available,
    • the flatMap method is triggered, which
    • then triggers execution of Future { ...block2... }.

    Thus Future { ...block2... } is executed after Future { ...block1... }

    Additional information

    A Future

    Future { 
      <block> 
    } 
    

    immediately triggers execution of contained block via the ExecutionContext

    Snippet 1:

    val f1 = Future { <body> }
    val f2 = Future { <otherbody> }
    

    The two computations are running parallel (in case your ExecutionContext is setup this way), as the two values are evaluated immediately.

    Snippet 2:

    The construct

    def f1 = Future { ..... }
    

    will start execution of the future, once f1 is called

    Edit:

    j3d, I'm still confused, why your code does not work as expected, if your statement is correct, that the Future is created within the computeSomethingX methods.

    Here is a code snippet that proves, that computeSomething2 is executed after computeSomething1

    import scala.concurrent.{Await, Future} import scala.concurrent.duration._

    object Playground {
    
      import scala.concurrent.ExecutionContext.Implicits.global
    
      def computeSomething1 : Future[Int] = {
        Future {
          for (i <- 1 to 10) {
            println("computeSomething1")
            Thread.sleep(500)
          }
          10
        }
      }
    
      def computeSomething2 : Future[String] = {
        Future {
          for(i <- 1 to 10) {
            println("computeSomething2")
            Thread.sleep(800)
          }
          "hello"
        }
      }
    
      def main(args: Array[String]) : Unit = {
    
        val resultFuture: Future[String] = for {
          v1 <- computeSomething1
          v2 <- computeSomething2
        } yield v2 + v1.toString
    
        // evil "wait" for result
    
        val result = Await.result(resultFuture, Duration.Inf)
    
        println( s"Result: ${result}")
      }
    }
    

    with output

    computeSomething1
    computeSomething1
    computeSomething1
    computeSomething1
    computeSomething1
    computeSomething1
    computeSomething1
    computeSomething1
    computeSomething1
    computeSomething1
    computeSomething2
    computeSomething2
    computeSomething2
    computeSomething2
    computeSomething2
    computeSomething2
    computeSomething2
    computeSomething2
    computeSomething2
    computeSomething2
    Result: hello10
    

    Edit 2

    If you want them to be executed in parallel, create the futures beforehand (here f1 and f2)

    def main(args: Array[String]) : Unit = {
      val f1 = computeSomething1
      val f2 = computeSomething2
    
      val resultFuture: Future[String] = for {
        v1 <- f1
        v2 <- f2
      } yield v2 + v1.toString
    
      // evil "wait" for result
    
      val result = Await.result(resultFuture, Duration.Inf)
    
      println( s"Result: ${result}")
    }