Search code examples
scalaakkaakka-clusterakka-remote-actorakka-remoting

Akka round-robin: Sending response from remote routees to sender


I am using Akka Cluster (version 2.4.10) with few nodes designated for "front-end" role and few others as "workers". The workers are on remote machines. The incoming work is distributed by the front-end actor to workers by round-robin routing. The issue is sending back the response from the "workers" back to the front-end actor. I can see that the work is getting completed by the workers. But the message sent by the workers to front-end does not reach and ends up as dead-letters. I see the below error in the log.

[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered.

I have seen this and I am following the same in my code. I have also seen this, but the solution suggested does not apply in this case, because I do not know the routees up-front. It comes through the configuration and it can change. The round-robin router configuration is as below.

akka.actor.deployment {
  /frontEnd/hm = {
    router = round-robin-group
    nr-of-instances = 5
    routees.paths = ["/user/hmWorker"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = on
    }
  }
}

The router is instantiated in front-end actor like below.

val router = context.actorOf(FromConfig.props(), name = "hm")
val controller = context.actorOf(Props(classOf[Controller], router))

The controller and the worker codes are below.

// Node 1 : Controller routes requests using round-robin
class Controller(router: ActorRef) extends Actor {

    val list = List("a", "b") // Assume this is a big list

    val groups = list.grouped(500)

    override def receive: Actor.Receive = {
      val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]]))
      val future = Future.sequence(futures).map(_.flatten)
      val result = Await.result(future, 50 seconds)
      println(s"Result is $result")
    }
}

// Node 2
class Worker extends Actor {

    override def receive: Actor.Receive = {
      case Message(lst) =>
            val future: Future[List[String]] = // Do Something asynchronous
            future onComplete {
                case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor.
                case Failure(th) => // Error handling
            }
    }
}

Please let me know what I am doing wrong here. Appreciate your help.


Solution

  • You shouldn't use sender() in the callback on a Future. By the time the callback is processed, the sender() is likely referring to something different than it was when you received the message.

    Consider either saving the reference outside of the callback first like:

    override def receive: Actor.Receive = {
      case Message(lst) =>
            val future: Future[List[String]] = // Do Something asynchronous
            val replyTo: ActorRef = sender()
            future onComplete {
                case Success(r) => replyTo.!(r)(context.parent) // This message is not delivered to Controller actor.
                case Failure(th) => // Error handling
            }
    }
    

    Or even better, use the pipe pattern:

    import akka.pattern.pipe
    override def receive: Actor.Receive = {
      case Message(lst) =>
        val future: Future[List[String]] = // Do Something asynchronous
        future.pipeTo(sender())
    }