Search code examples
scalaconcurrencyakkaactormessage

Can actors read messages under a certain condition?


I have this situation:

  • ActorA sends ActorB start/stop messages every 30-40 seconds
  • ActorA sends ActorB strings to print (always)
  • ActorB must print the strings he receive, but only if ActorA sent just a start message

Now i wonder if i can do the following things:

  • Can ActorB read messages only under a certain condition (if a boolean is set as true) without losing the messages he receives while that boolean is set as false?
  • Can ActorB read a start/stop message from ActorA before the other string messages? I'd like to have this situation: ActorA sends a start message to ActorB, ActorB start printing the strings he received before the start messages (and that is still receiving) and then stop as soon as it receives a stop messages?

I don't know if I explained it well.

EDIT: Thank you, the answers are great, but I still have some doubts.

  • Does the become mantain the order of the messages? I mean, if i have "Start-M1-M2-Stop-M3-M4-M5-Start-M6-M7-Stop", will the printing order be "M1-M2" and then "M3-M4-M5-M6-M7" or could M6 be read before M3, M4 and M5 (if M6 is received just after the become)?

  • Can I give a higher priority to start/stop messages? If ActorB receives "M1-M2-M3", and then it receives a stop message while it is printing "M1", i want that ActorB saves again M2 and M3.


Solution

  • You can exactly solve your problem with the Stash trait and the become/unbecome functionality of Akka. The idea is the following:

    When you receive a Stop message then you switch to a behaviour where you stash all messages which are not Start. When you receive a Start message, then you switch to a behaviour where you print all received messages and additionally you unstash all messages which have arrived in the meantime.

    case object Start
    case object Stop
    case object TriggerStateChange
    case object SendMessage
    
    class ActorB extends Actor with Stash {
      override def receive: Receive = {
        case Start =>
          context.become(printingBehavior, false)
          unstashAll()
        case x => stash()
      }
    
      def printingBehavior: Receive = {
        case msg: String => println(msg)
        case Stop => context.unbecome()
      }
    }
    
    class ActorA(val actorB: ActorRef) extends Actor {
    
      var counter = 0
      var started = false
    
      override def preStart: Unit = {
        import context.dispatcher
    
        this.context.system.scheduler.schedule(0 seconds, 5 seconds, self, TriggerStateChange)
        this.context.system.scheduler.schedule(0 seconds, 1 seconds, self, SendMessage)
      }
    
      override def receive: Actor.Receive = {
        case SendMessage =>
          actorB ! "Message: " + counter
          counter += 1
        case TriggerStateChange =>
          actorB ! (if (started) {
            started = false
            Stop
          } else {
            started = true
            Start
          })
      }
    }
    
    object Akka {
      def main(args: Array[String]) = {
        val system = ActorSystem.create("TestActorSystem")
    
        val actorB = system.actorOf(Props(classOf[ActorB]), "ActorB")
        val actorA = system.actorOf(Props(classOf[ActorA], actorB), "ActorA")
    
        system.awaitTermination()
      }
    }