Take a look at the following diagram:
In this scenario we have two Managers. ManagerA and ManagerB, both spawned by the same father at the same time. ManagerB also immediately spawns 3 children that start processing some data. When the worker is done processing and has a result, he messages ManagerB back with the result.
ManagerB then informs ManagerA that he has an object called B1 ready. ManagerA then tells A1,A2 and A3 that B1 is ready an passes it to them. But only A1 and A2 need B1 so they keep it and A3 discards the message. A1 has all his requirements to start executing his main logic now since he has B1, so he does so. Meanwhile A2 still needs B2 and A3 needs B3.
How would one implement such logic using akka actors in Scala? I'm having a particularly hard time with finding a way to hold the requirements in a functional way and starting execution when all requirements are finally met.
The basic idea I would use is to encode the state in a case class with at minimum a set of requirements and a map whose keys are a (non-strict) subset of the requirements and whose values are the results from the B workers, e.g. for A1
, we might have:
requirements = Set("B1")
received = Map("B1" -> B1Result(...))
Your actor's behavior is then a function of its state: messages which change the state change the behavior.
More concretely, you might do something like:
object AWorker {
type BResult = Any // BResult would probably defined elsewhere, I'm aliasing Any for brevity
type Action = (ActorContext[Command], Map[String, BResult]) => Unit
sealed trait Command
case class RequirementsAndDo(
requirements: Set[String],
action: Action,
replyTo: Option[ActorRef[Reply]]
) extends Reply
case class Completed(key: String, value: BResult, replyTo: Option[ActorRef[Reply]]) extends Reply
sealed trait Reply
case object Ack extends Reply
def apply(): Behavior[Command] = withState(State())
private[AWorker] case class State(
requirements: Set[String] = Set.empty,
received: Map[String, BResult] = Map.empty,
whenRequirementsMet: Option[Action] = None
) {
def updateRequirements(req: Set[String], f: Option[Action]): State = {
// For now, start anew...
State(req)
}
def completeRequirement(req: String, v: BResult): State =
copy(received = received.updated(req, v))
def checkAndPerform(ctx: ActorContext[Command]): State = {
if (requirements.diff(received.keySet).isEmpty) {
whenRequirementsMet.foreach { a =>
a(ctx, received)
}
State()
} else this
}
private def withState(state: State): Behavior[Command] =
Behaviors.receive { (context, msg) =>
msg match {
case RequirementsAndDo(req, action, replyTo) =>
val nextState = state.updateRequirements(req, action)
replyTo.foreach(_ ! Ack)
withState(nextState)
case Completed(key, value, replyTo) =>
val intermediateState = state.completeRequirement(key, value)
replyTo.foreach(_ ! Ack)
withState(intermediateState.checkAndPerform(context))
}
}
}
You can make this even more FP by having the methods on State
return a tuple of effects and the new state, but this IMO is close to the optimally pragmatic functionality. Note particularly that this encourages separation of domain effects (e.g. around work completion) from implementation protocol effects (e.g. replies).