Search code examples
scalaakkaactorpublish-subscribefsm

Publish-Subscribe for Akka Actor FSM not working


I have basic trait and sub classes for this trait. Each subclass subscribes to the event stream using its own event class. For example ActorFSM1 cares about InitEventImpl1 so it will subscribe to this event. However when I publish these specific events, no actor receive them.

trait InitEvent
case class InitEventImpl1 extends InitEvent
case class InitEventImpl2 extends InitEvent

class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
  context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
}
class ActorFSM2 extends ActorFSM[InitEventImpl2] {
  context.system.eventStream.subscribe(self, classOf[InitEventImpl2] )
}

When I attempt to publish them as below, no one receives a message. What am I doing wrong?

 val system = ActorSystem("system")
 val actor = system.actorOf(Props(new ActorFSM1()) )
 system.eventStream.publish(InitEventImpl1())

Solution

  • There is no guarantee that the actor has been initialized when the message is published, nor is there any guarantee that the call to subscribe has been called before publish. This is a classic actor-construction race condition. The creation of the ActorRef is synchronous but the creation of the actual Actor - when the subscription occurs - is asynchronous and could happen well after the creation of the ActorRef. You can test this by sending a bunch of messages to the stream to see if the Actor eventually sees some of them.

    trait InitEvent
    case class InitEventImpl1(id: Int) extends InitEvent
    
    class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
    class ActorFSM1 extends ActorFSM[InitEventImpl1] {
      context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
      def receive: Receive = {
        case InitEventImpl1(id) => println(id)
      }
    }
    

    Then publish a bunch of messages and see if it eventually gets some of them:

    val system = ActorSystem("system")
    val actor = system.actorOf(Props(new ActorFSM1()) )
    Seq.tabulate(100)(InitEventImpl1(_)).foreach(system.eventStream.publish)
    

    If the Actor absolutely must receive the message, I recommend either sending the message directly to the ActorRef or waiting for a message from the Actor to indicate that it is ready to receive messages.

    On another note, your use of Generics here doesn't seem to help you at all. It seems what you want to do is use the generic parameter in the subscribe call and receive. This can be done with some reflection magic:

    import akka.actor.Actor
    import scala.reflect._
    
    class Test[E <: InitEvent : ClassTag] extends Actor {
       context.system.eventStream.subscribe(self, classTag[E].runtimeClass)
       def receive: Receive = {
         case message: E => println(message)
       }
    }
    

    Then specify the message type when creating the ActorRef

    val myTypeActor = system.actorOf(Props(new Test[InitEventImpl1]))
    myTypeActor ! InitEventImpl1 // Will be processed
    myTypeActor ! InitEventImpl2 // Will not be processed