I have basic trait and sub classes for this trait. Each subclass subscribes to the event stream using its own event class. For example ActorFSM1 cares about InitEventImpl1 so it will subscribe to this event. However when I publish these specific events, no actor receive them.
trait InitEvent
case class InitEventImpl1 extends InitEvent
case class InitEventImpl2 extends InitEvent
class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
}
class ActorFSM2 extends ActorFSM[InitEventImpl2] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl2] )
}
When I attempt to publish them as below, no one receives a message. What am I doing wrong?
val system = ActorSystem("system")
val actor = system.actorOf(Props(new ActorFSM1()) )
system.eventStream.publish(InitEventImpl1())
There is no guarantee that the actor has been initialized when the message is published, nor is there any guarantee that the call to subscribe
has been called before publish
. This is a classic actor-construction race condition. The creation of the ActorRef
is synchronous but the creation of the actual Actor
- when the subscription occurs - is asynchronous and could happen well after the creation of the ActorRef
. You can test this by sending a bunch of messages to the stream to see if the Actor
eventually sees some of them.
trait InitEvent
case class InitEventImpl1(id: Int) extends InitEvent
class ActorFSM[T <: InitEvent] extends FSM[s, Data] {}
class ActorFSM1 extends ActorFSM[InitEventImpl1] {
context.system.eventStream.subscribe(self, classOf[InitEventImpl1] )
def receive: Receive = {
case InitEventImpl1(id) => println(id)
}
}
Then publish a bunch of messages and see if it eventually gets some of them:
val system = ActorSystem("system")
val actor = system.actorOf(Props(new ActorFSM1()) )
Seq.tabulate(100)(InitEventImpl1(_)).foreach(system.eventStream.publish)
If the Actor
absolutely must receive the message, I recommend either sending the message directly to the ActorRef
or waiting for a message from the Actor
to indicate that it is ready to receive messages.
On another note, your use of Generics here doesn't seem to help you at all. It seems what you want to do is use the generic parameter in the subscribe
call and receive
. This can be done with some reflection magic:
import akka.actor.Actor
import scala.reflect._
class Test[E <: InitEvent : ClassTag] extends Actor {
context.system.eventStream.subscribe(self, classTag[E].runtimeClass)
def receive: Receive = {
case message: E => println(message)
}
}
Then specify the message type when creating the ActorRef
val myTypeActor = system.actorOf(Props(new Test[InitEventImpl1]))
myTypeActor ! InitEventImpl1 // Will be processed
myTypeActor ! InitEventImpl2 // Will not be processed