Search code examples
scalaakkaakka-clusterakka-persistence

Akka cluster sharding doesn't work


I'm using akka 2.4.7 on Scala 2.11.8.

Events are coming to extractShardId and extractEntityId but they are not being propagated to the receive method of the actor. Any ideas of what could it be?

https://bitbucket.org/kuzelac/apt-billing/overview branch price-shard

Have defined a cluster shard:

val dailyPriceAggregateActor: ActorRef = ClusterSharding(context.system).start(
typeName = "DailyPriceAggregateActor",
entityProps = DailyPriceAggregateActor(),
settings = ClusterShardingSettings(context.system),
extractEntityId = DailyPriceAggregateActor.extractEntityId,
extractShardId = DailyPriceAggregateActor.extractShardId)

The actor object is

object DailyPriceAggregateActor {
  def apply() = Props(classOf[DailyPriceAggregateActor])

val extractEntityId: ShardRegion.ExtractEntityId = {
  case e@DailyPriceSaved(userId, unitId, _, _, _) => (s"$userId$unitId", e)
  case e@LookupPriceForDay(userId, unitId, _) => (s"$userId$unitId", e)
}

val extractShardId: ShardRegion.ExtractShardId = {
  case _ => "one"
}
}

I've set the conf to

akka.actor.provider = "akka.cluster.ClusterActorRefProvider"

Actor is being seeded by Persistence Query

  def startSync(actor: ActorRef) = {
    val queries = PersistenceQuery(context.system).readJournalFor[ScalaDslMongoReadJournal](MongoReadJournal.Identifier)

    val src: Source[EventEnvelope, NotUsed] =
  queries.eventsByPersistenceId(PriceAggregateActor.persistenceId, 0L, Long.MaxValue)

    src.runForeach(actor ! _.event)
  }

Solution

  • Forgot to add seed-nodes

    seed-nodes = [
      "akka.tcp://{your-actor-system-name}@127.0.0.1:8999"
    ]