Search code examples
scalaakka

How do I call context become outside of a Future from Ask messages?


I have a parent akka actor named buildingCoordinator which creates childs name elevator_X. For now I am creating only one elevator. The buildingCoordinator sends a sequence of messages and wait for responses in order to move an elevator. The sequence is this: sends ? RequestElevatorState -> receive ElevatorState -> sends ? MoveRequest -> receives MoveRequestSuccess -> changes the state. As you can see I am using the ask pattern. After the movement is successes the buildingCoordinator changes its state using context.become.

The problem that I am running is that the elevator is receiving MoveRequest(1,4) for the same floor twice, sometimes three times. I do remove the floor when I call context.become. However I remove inside the last map. I think it is because I am using context.become inside a future and I should use it outside. But I am having trouble implementing it.

case class BuildingCoordinator(actorName: String,
                               numberOfFloors: Int,
                               numberOfElevators: Int,
                               elevatorControlSystem: ElevatorControlSystem)
  extends Actor with ActorLogging {
  import context.dispatcher
  implicit val timeout = Timeout(4 seconds)
  val elevators = createElevators(numberOfElevators)

  override def receive: Receive = operational(Map[Int, Queue[Int]](), Map[Int, Queue[Int]]())

  def operational(stopsRequests: Map[Int, Queue[Int]], pickUpRequests: Map[Int, Queue[Int]]): Receive = {
    case msg@MoveElevator(elevatorId) =>
      println(s"[BuildingCoordinator] received $msg")
      val elevatorActor: ActorSelection = context.actorSelection(s"/user/$actorName/elevator_$elevatorId")
      val newState = (elevatorActor ? RequestElevatorState(elevatorId))
        .mapTo[ElevatorState]
        .flatMap { state =>
          val nextStop = elevatorControlSystem.findNextStop(stopsRequests.get(elevatorId).get, state.currentFloor, state.direction)
          elevatorActor ? MoveRequest(elevatorId, nextStop)
        }
        .mapTo[MoveRequestSuccess]
        .flatMap(moveRequestSuccess => elevatorActor ? MakeMove(elevatorId, moveRequestSuccess.targetFloor))
        .mapTo[MakeMoveSuccess]
        .map { makeMoveSuccess =>
          println(s"[BuildingCoordinator] Elevator ${makeMoveSuccess.elevatorId} arrived at floor [${makeMoveSuccess.floor}]")
          // removeStopRequest
          val stopsRequestsElevator = stopsRequests.get(elevatorId).getOrElse(Queue[Int]())
          val newStopsRequestsElevator = stopsRequestsElevator.filterNot(_ == makeMoveSuccess.floor)
          val newStopsRequests = stopsRequests + (elevatorId -> newStopsRequestsElevator)

          val pickUpRequestsElevator = pickUpRequests.get(elevatorId).getOrElse(Queue[Int]())
          val newPickUpRequestsElevator = {
            if (pickUpRequestsElevator.contains(makeMoveSuccess.floor)) {
              pickUpRequestsElevator.filterNot(_ == makeMoveSuccess.floor)
            } else {
              pickUpRequestsElevator
            }
          }
          val newPickUpRequests = pickUpRequests + (elevatorId -> newPickUpRequestsElevator)
          // I THINK I SHOULD NOT CALL context.become HERE
          // context.become(operational(newStopsRequests, newPickUpRequests))

          val dropOffFloor = BuildingUtil.generateRandomFloor(numberOfFloors, makeMoveSuccess.floor, makeMoveSuccess.direction)
          context.self ! DropOffRequest(makeMoveSuccess.elevatorId, dropOffFloor)

          (newStopsRequests, newPickUpRequests)
        }
      // I MUST CALL context.become HERE, BUT I DONT KNOW HOW
      // context.become(operational(newState.flatMap(state => (state._1, state._2))))
  }

Other thing that might be nasty here is this big chain of map and flatMap. This was my way to implement, however I think it might exist one way better.


Solution

  • You can't and you should not call context.become or anyhow change actor state outside Receive method and outside Receive method invoke thread (which is Akka distpatcher thread), like in your example. Eg:

    def receive: Receive  = {
       // This is a bug, because context is not and is not supposed to be thread safe.
       case message: Message => Future(context.become(anotherReceive))
    }
    

    What you should do - send message to self after async operation finished and change the state receive after. If in a mean time you don't want to handle incoming messages - you can stash them. See for more details: https://doc.akka.io/docs/akka/current/typed/stash.html

    High level example, technical details omitted:

    case OperationFinished(calculations: Map[Any, Any])
    
    class AsyncActor extends Actor with Stash {
    
     def operation: Future[Map[Any, Any]] = ...//some implementation of heavy async operation
    
     def receiveStartAsync(calculations: Map[Any, Any]): Receive =  {
       case StartAsyncOperation => 
         //Start async operation and inform yourself that it is finished
         operation.map(OperationFinished.apply) pipeTo self
         context.become(receiveWaitAsyncOperation)
     }
    
     def receiveWaitAsyncOperation: Receive = {
       case OperationFinished => 
         unstashAll()
         context.become(receiveStartAsync)
    
       case _ => stash()
     }
    }