Search code examples
scalaakkabroadcast

Akka broadcast ask doesn't deliver to all routees


I am learning to use broadcast messages in Akka routers. Is there a way to receive responses for ask from all the routees to the router?

I have this sample code.

Master.scala

object Master {
    case object brdcst
}

class Master extends Actor {
    implicit val timeout = Timeout(5 seconds)

    val router: ActorRef = context.actorOf (RoundRobinPool (3).props(Props[Worker]), "router")

    override def receive: Receive = {
        case brdcst => {
            val future = router ? Broadcast(brdcst)
            val result = Await.result(future, timeout.duration)
            println("result = " + result)
        }
    }
}

object MasterTest extends App {
    val actorSystem = ActorSystem("ActorSystem")
    val actor = actorSystem.actorOf(Props[master], "root")
    actor ! brdcst
}

Worker.scala

class Worker extends Actor {
    val routee = context.actorOf(Props[Worker], "routee")

    override def receive: Receive = {
        case brdcst => sender() ! self.path.name
    }
}

This code gives the following output

result = $a
[INFO] [10/16/2018 21:47:07.484] [ActorSystem-akka.actor.default-dispatcher-2] [akka://ActorSystem/deadLetters] Message [java.lang.String] from Actor[akka://ActorSystem/user/root/router/$a#340358688] to Actor[akka://ActorSystem/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ActorSystem/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/16/2018 21:47:07.504] [ActorSystem-akka.actor.default-dispatcher-10] [akka://ActorSystem/deadLetters] Message [java.lang.String] from Actor[akka://ActorSystem/user/root/router/$b#-151225340] to Actor[akka://ActorSystem/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ActorSystem/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

So the reply is coming only from one routee. How to get responses from all the routees? (Maybe as a list like result = [$a, $b, $c])


Solution

  • ask (?) creates an internal actor to handle replies. This internal actor handles only a single reply and is automatically shut down: this is the reason you get only the first response, while the responses from the other two routees go to dead letters.

    To get the desired behavior, use tell (!) and collect the responses from the routees. For example:

    class Master extends Actor {
      val numRoutees = 3
      val router = context.actorOf(RoundRobinPool(numRoutees).props(Props[Worker]), "router")
    
      def handleMessages(replies: Set[String] = Set()): Receive = {
        case brdcst =>
          router ! Broadcast(brdcst)
        case reply: String =>
          val updatedReplies = replies + reply
          if (updatedReplies.size == numRoutees) {
            println("result = " + updatedReplies.mkString("[", ",", "]"))
          }
          become(handleMessages(updatedReplies))
      }
    
      def receive = handleMessages
    }
    

    In the above example, the master encodes the routees' replies as part of its state, using become.

    Also, don't use Await in actors.