I have an Actor say ProcessTheCommand actor which has a receive method implemented as below:
class ProcessTheCommand extends Actor {
def receive = {
case obj: DataObject =>
val f1 = OneActor ? obj
val f2 = SecondActor ? obj
// val result= resultFromf1 + resultFromf2
// do something using the result
}
}
My problem: How do i get the data out of the two futures?
One way to do it is to use await
but that is not the best practice. Suggestions?
There are two parts to the question. The first is how to extract two futures and add them together, and the second one is how we process the result of these futures in Akka.
There a couple of ways. I'm assuming the results of the futures is the same. One would be using for comprehension:
val firstFuture: Future[Int] = ???
val secondFuture: Future[Int] = ???
val result: Future[Int] = for {
first <- firstFuture
second <- secondFuture
} yield first + second
Another option would be to use Future.sequence
:
val firstFuture: Future[Int] = ???
val secondFuture: Future[Int] = ???
val result: Future[Int] = Future
.sequence(Seq(firstFuture, secondFuture))
.map {
results => results.sum
}
Another would be zip
with map
(thanks @ViktorKlang):
firstFuture
.zip(secondFuture)
.map { case (first, second) => first + second }
Or with Scala 2.12 zipWith
:
val result: Future[Int] = firstFuture.zipWith(secondFuture) {
case (first, second) => first + second
}
The only missing piece is how we get the accumulated result. The pattern in Akka is to pipe the result to your own Receive
, since we never want to block on the method call, what we actually want is the future to invoke a callback once it completes, which is exactly what pipeTo
will do.
We'll create a custom case class which encapsulates the result:
case class AccumulatedResult(result: Int)
And add it in Receive
:
import akka.pattern.pipe
override def receive: Receive = {
case obj: DataObject =>
val firstFuture = OneActor ? obj
val secondFuture = SecondActor ? obj
firstFuture
.zip(secondFuture)
.map { case (first, second) => AccumulatedResult(first + second) }
.pipeTo(self)
case AccumulatedResult(res) => println(res)
}
The nice thing about this is that once the future completes the handling of the message will continue as part of the flow handling logic of the actor.