I am wondering if there is a better way to handle async initialization of values within an Actor. Actors of course are thread safe when inside the actor, but using Futures throws a wrinkle in that (and you have to make sure you don't close over context
or sender
) Consider the following:
class ExampleActor(ref1: ActorRef, ref2: ActorRef) extends Actor {
implicit val ec = context.dispatcher
val promise1 = Promise[Int]
val promise2 = Promise[Int]
def receive = {
case Request1.Response(x) => promise1.success(x)
case Request2.Response(y) => promise2.success(y)
case CombinedResponse(x, y) => x + y
}
promise1.future foreach { x =>
promise2.future foreach { y =>
self ! CombinedResponse(x, y)
}
}
ref1 ! Request1
ref2 ! Request2
}
Is there a better/more idiomatic way of handling parallel requests like this?
You actually don't need futures to handle multi-part response:
var x: Option[Int] = None
var y: Option[Int] = None
def receive = {
case Request1.Response(x) => x = Some(x); checkParts
case Request2.Response(y) => y = Some(y); checkParts
}
def checkParts = for {
xx <- x
yy <- y
} parent ! xx + yy
By the way, you may use for-comprehension in the same way even with futures.
More functional way to manage actor's state:
case class Resp1(x: Int)
case class Resp2(y: Int)
case class State(x: Option[Int], y: Option[Int])
class Worker(parent: ActorRef) extends Actor {
def receive = process(State(None, None))
def process(s: State): Receive = edge(s) andThen { sn =>
context become process(sn)
for {
xx <- sn.x
yy <- sn.y
} parent ! xx + yy //action
}
def edge(s: State): PartialFunction[Any, State] = { //managing state
case Resp1(x) => s.copy(x = Some(x))
case Resp2(y) => s.copy(y = Some(y))
}
}
Reusing the actor instead of creating a future is better because promise.success
actually does a non-managable side-effect by submitting task into an executor, so it's not a pure functional way. Actor's state is better, as side-effect inside an actor is always consistent with other actor's - they're applied step-by-step and only in response to some message. So you may see the actor just as fold
on infinite collection; state and messages (also infinite) sent by actor may be seen just as fold's accumulator.
Talking about Akka, its actors have some kind of IoC features like automatical exception handling (through supervising), which isn't available inside future. In your case, you have to introduce an additional composite message to return into the actor's IoC-context. Adding any other action than self ! CombinedResponse(x, y)
(which, for example, may be accidentally done by some other developer to implement some workaround) is unsafe.