Search code examples
javascalaakka

How to make part of Akka ActorSystem synchronous in Typed Akka?


I am trying to make part of the code synchronous and the other part asynchronous. What I want to achieve is getting the latest Map from each child Actor in an Actor System. So basically wait for the asynchronous calls of messages to complete and then call a synchronous procedure. Until now I have failed to achieve this kind of behaviour (see TODO in MasterSite.scala). Any idea's on how to do that in Scala Typed Akka?

MasterSite.scala

object MasterSite {
  sealed trait MasterSiteProtocol
  final case class Broadcast(
                              msg: Site.SiteProtocol,
                              from: ActorRef[Site.SiteProtocol],
                              partitionSet: Set[ActorRef[SiteProtocol]]
                            ) extends MasterSiteProtocol

  def apply(): Behavior[MasterSiteProtocol] =
    Behaviors.setup { context =>
     // create/spawn actors (here actors represent Sites)
      val siteA = context.spawn(Site(), "A")
      val siteB = context.spawn(Site(), "B")
      val siteC = context.spawn(Site(), "C")
      val siteD = context.spawn(Site(), "D")
      
      var init_sitePartitionList: List[Set[ActorRef[SiteProtocol]]] = List(Set(siteA,siteB,siteC,siteD))
      
      val time_a1 = System.currentTimeMillis().toString

      val partitionSet1 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB, siteC, siteD)
      siteA ! Site.FileUpload(time_a1, context.self, "test.txt", partitionSet1)

      // split into List(Set{A,B}, Set{C,D})
      init_sitePartitionList = splitPartition(init_sitePartitionList, Set(siteA, siteB))

      val partitionSet2 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB)
      siteA ! Site.FileUpdate(("A", time_a1), context.self, partitionSet2)
      siteA ! Site.FileUpdate(("A", time_a1), context.self, partitionSet2)

      init_sitePartitionList = mergePartition(init_sitePartitionList, Set(siteA, siteB, siteC, siteD)) // returns Set(siteA, siteB, siteC, siteD)

      val partitionSet3 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB, siteC, siteD)
      siteA ! Merged(siteC, context.self, partitionSet3) // --> TODO: when code reaches this point, I want that all the message calls before this call are fully executed by the sites and then this message should be sent and handled by `siteA`. How to achieve that?

      Behaviors.receiveMessage {
        case Broadcast(msg: SiteProtocol, from: ActorRef[SiteProtocol], partitionSet: Set[ActorRef[SiteProtocol]]) =>
          partitionSet.foreach { child =>
            if(!child.equals(from)) {
              child ! msg
            }
          }
          Behaviors.same
      }
    }

}

Solution

  • In general there is no way out of the box to do this.

    The only way to know that a message has been processed is for the recipient of that message (or its designee) to send a message in reply. This requires designing a protocol (in this case in the SiteProtocol) around this request-reply interaction. The sender then keeps track of which replies it's waiting for (correlation IDs or being sure that you don't have concurrent interactions with the same actor are helpful here) and when all the replies are received it can then move on to doing the next thing.

    Likewise, a request-response protocol can be used to gather values computed by the site actors. Note that there are no intrinsic protections against the actors processing other messages (and possibly changing their own state), though limiting how widely (and how general the message type) the ActorRefs are disseminated can limit this.

    It is technically possible to have a side-channel using non-Akka concurrency approaches to share data between actors: note that if going this route, one is largely throwing away the benefits of the actor model (especially the single-threaded illusion and location transparency). If considering this, it's also worth considering not using Akka.