Search code examples
scalaakkaactorshardingakka-typed

Priority Mailbox with Akka Typed Actors and Cluster Sharding


I have a cluster sharding application with typed actors. The actor looks like this:

object TestActor {

  sealed trait Command
  final case class Inte(i: Int) extends Command
  final case class Stringaki(s: String) extends Command

  val TypeKey = EntityTypeKey[Command]("Test")

  def defaultThreadBehavior(id: String): Behavior[Command] = Behaviors.setup { ctx =>

    Behaviors.receiveMessage { cmd =>
      cmd match {
        case Inte(i) =>
          ctx.log.info(System.currentTimeMillis()/1000 + " Received int: " + i)
          Thread.sleep(1000)
        case Stringaki(s) =>
          ctx.log.info(System.currentTimeMillis()/1000 + " Received string: " + s)
          Thread.sleep(1000)
      }
      Behaviors.same
    }
  }

}

The actors are created through the Sharding Envelope like this:

val system_config = ConfigFactory.parseString(
      """
        |akka {
        |  actor {
        |    provider = "cluster"
        |    prio-dispatcher {
        |      type = "Dispatcher"
        |      mailbox-type = "PriorityMailbox"
        |    }
        |  }
        |  remote {
        |    netty.tcp {
        |      hostname = "127.0.0.1"
        |      port = 2551
        |    }
        |  }
        |  cluster {
        |    seed-nodes = [
        |      "akka.tcp://[email protected]:2551"
        |    ]
        |    sharding {
        |      number-of-shards = 10
        |      use-dispatcher = "akka.actor.prio-dispatcher"
        |    }
        |  }
        |}
        |""".stripMargin)

    val system = ActorSystem(Behaviors.empty[TestActor.Command], "TestApp",system_config)
    val sharding = ClusterSharding(system)

    val shardRegion = sharding.init(Entity(TestActor.TypeKey, ctx => defaultThreadBehavior(ctx.entityId)))

    (0 to 9).foreach{
      i =>
        shardRegion ! ShardingEnvelope(0.toString, Inte(i))
    }

    (0 to 9).foreach{
      i =>
        shardRegion ! ShardingEnvelope(0.toString, Stringaki(i.toString))
    }

Two for loops send messages to the same actor. The first loop sends Integers and the second loop sends Strings. When the actor processes the messages it sleeps in order to build up messages in the queue and test the priority. The priority mailbox is configured in the system config and the UnboundedPriorityMailbox implementation is the following:

class PriorityMailbox (settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

  PriorityGenerator {
    case Stringaki => 0
    case _ => 1
  }

)

Why does the Actor print the messages in the order that they arrive and does not take into account the priority generator?


Solution

  • The short answer to why you're not seeing the effect of the priority mailbox is that your TestActors aren't using the priority mailbox, but the default mailbox. Only the Akka Cluster sharding system is using the priority mailbox. Cluster sharding reference.conf description of akka.cluster.sharding.use-dispatcher:

    # The id of the dispatcher to use for ClusterSharding actors.
    # If specified you need to define the settings of the actual dispatcher.
    # This dispatcher for the entity actors is defined by the user provided
    # Props, i.e. this dispatcher is not used for the entity actors.
    

    It is true that every message you're sending goes through the priority mailbox, but since the actors internal to cluster sharding aren't sleeping, there's no backlog developing (though in some situations, especially with fewer cores, there might be a backlog where prioritization could shine).

    To have the entity actors run in a dispatcher with a priority mailbox, you would have something like

    val entityDispatcherProps = DispatcherSelector.fromConfig("akka.actor.prio-dispatcher")
    val baseEntity = Entity(TestActor.TypeKey)(ctx => defaultThreadBehavior(ctx.entityId))
    val shardRegion = sharding.init(baseEntity.withEntityProps(entityDispatcherProps))