Search code examples
scalaakkaactordead-letter

Dead Letters using Akka to create a message ring


I'm trying to create an example Akka application using remote actors. The goal is to create for example 16 actors that exchange messages in a sequential manner (actor 16 talks to actor 15, 15 to 14, etc, and 1 talks to actor 16). However, I'm having trouble with the communication, since I continuosly have this error.

[INFO] [05/04/2017 15:45:58.248] [ActorFlasks-akka.actor.default-dispatcher-4] [akka://ActorFlasks/deadLetters] Message [java.lang.String] from Actor[akka://ActorFlasks/user/16#-2022012132] to Actor[akka://ActorFlasks/deadLetters] was not delivered. [1] dead letters encountered.

To do this, I run 16 terminal instances of the application, always with a different configuration file. I create the actorsystem in each instance like so:

object Main extends App {

    val localId = args(0)

    val configFile = getClass.getClassLoader.getResource(s"application$localId.conf").getFile
    val config = ConfigFactory.parseFile(new File(configFile))
    val system = ActorSystem("ActorFlasks" , config)
    val remote = system.actorOf(Props[CyclonManager], name=localId)

    remote ! "START"
}

An example of a configuration file is this:

akka {
  actor {
    provider = remote
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "localhost"
      port = 50001
    }
 }
}

And the actor is defined like this:

class CyclonManager extends Actor {

  def propagateMessage(): Unit = {
    val localId = self.path.name.toInt
    val currentPort = 50000 + localId
    val nextHopPort = if (currentPort == 50001) 50016 else currentPort - 1
    val nextHopId = localId-1

    val nextHopRef = context.actorSelection(s"akka.tcp://ActorFlasks@localhost:$nextHopPort/user/$nextHopId")

    nextHopRef ! "NEXT"
  }

  override def receive: Receive = {
    case "START" =>
      if (self.path.name == "16") {
        propagateMessage()
      }
    case "NEXT" =>
      propagateMessage()
    case _ =>
      println("Unrecognized message")
  }
}

It is a simple example to get me started, but I can't get it working no matter what I try. Does someone know where I'm failing?

Thank you in advance,

EDIT:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "localhost"
      port = 50015
    }
 }
}

Solution

  • After reconstructing and running your example I found one mistake in the propagateMessage function.

    val nextHopId = localId-1
    

    Should be

    val nextHopId = if (currentPort == 50001) 16 else localId-1
    

    If it doesn't solve your issue, try running my quick and dirty but working code and see how is it different from yours: https://gist.github.com/grantzvolsky/4a53ce78610038a9d44788d7151dc416

    In my code I only used actors 14, 15, and 16. You can run each using sbt "run 16", etc.