Search code examples
scalaakkaactor

How to get data from a future in an Actor System written in Scala?


I have an Actor say ProcessTheCommand actor which has a receive method implemented as below:

class ProcessTheCommand extends Actor {
   def receive = {
     case obj: DataObject => 
       val f1 = OneActor ? obj
       val f2 = SecondActor ? obj
       // val result= resultFromf1 + resultFromf2
       // do something using the result
   }
}

My problem: How do i get the data out of the two futures?

One way to do it is to use await but that is not the best practice. Suggestions?


Solution

  • There are two parts to the question. The first is how to extract two futures and add them together, and the second one is how we process the result of these futures in Akka.

    Composing Futures:

    There a couple of ways. I'm assuming the results of the futures is the same. One would be using for comprehension:

    val firstFuture: Future[Int] = ???
    val secondFuture: Future[Int] = ???
    val result: Future[Int] = for {
      first <- firstFuture
      second <- secondFuture
    } yield first + second
    

    Another option would be to use Future.sequence:

    val firstFuture: Future[Int] = ???
    val secondFuture: Future[Int] = ???
    
    val result: Future[Int] = Future
      .sequence(Seq(firstFuture, secondFuture))
      .map {
        results => results.sum
      }
    

    Another would be zip with map (thanks @ViktorKlang):

    firstFuture
     .zip(secondFuture)
     .map { case (first, second) => first + second }
    

    Or with Scala 2.12 zipWith:

    val result: Future[Int] = firstFuture.zipWith(secondFuture) {
      case (first, second) => first + second
    }
    

    Extracting the value inside the Actor:

    The only missing piece is how we get the accumulated result. The pattern in Akka is to pipe the result to your own Receive, since we never want to block on the method call, what we actually want is the future to invoke a callback once it completes, which is exactly what pipeTo will do.

    We'll create a custom case class which encapsulates the result:

    case class AccumulatedResult(result: Int)
    

    And add it in Receive:

    import akka.pattern.pipe
    override def receive: Receive = {
      case obj: DataObject => 
        val firstFuture = OneActor ? obj
        val secondFuture = SecondActor ? obj
        firstFuture
         .zip(secondFuture)
         .map { case (first, second) => AccumulatedResult(first + second) }
         .pipeTo(self)
    
      case AccumulatedResult(res) => println(res)
    }
    

    The nice thing about this is that once the future completes the handling of the message will continue as part of the flow handling logic of the actor.