Search code examples
scalaakkaactor

How to add "default" Receive behaviour to an AKKA Actor that is able to change its behaviour?


Background:

I have an Actor called Peer. Within my application, a Peer represents a connection between the server that created it and another server. It is essentially a http-client.

When the instance is first ran, the PeerManager queries the database for the list of server entries. It then creates a Peer actor for that server. The state of the Peer depends on its RegistrationStatus field (PENDING_OUT, PENDING_IN, REGISTERED, ...)

A Peer has a default receive function that looks like this:

  override def receive: Receive = {
    case FirstTick =>
      system.log.info("receive FirstTick")
      timers.startTimerWithFixedDelay(TickKey, Tick, FiniteDuration(2, TimeUnit.SECONDS))
    case Tick =>
      system.log.info("receive Tick")
      // process every state until all are complete
      var statesLeft = false
      
      states = states.sortWith{ (s1, s2) => s1.order > s2.order }

      for (state <- states) {
        if (!state.complete) {
          statesLeft = true
          if (state.waitForPing) {
            context.become(pingService(state))
          } else {
            context.become(state.behavior)
          }
        }
      }

      if (!statesLeft) context.become(pingService(null))
  }

This basically allows a Peer to transition through a list of states, executing each state's behaviour until the state is complete.

A set of state might look like this:

    case PENDING_OUT =>
        Seq(
          new InitState(0, peer),
          new SessionHandshake(1, peer),
          new WaitMessages(2, peer)
        )

An example of one of the states:

class WaitAccept(order: Int, peer: Peer)(implicit system: ActorSystem) extends PeerState(order, false, true) {

  override def behavior: Receive = {
    case Tick =>
      system.log.info("Waiting for accept")
      if (complete) peer.getContext.unbecome()

    case rA@RegistrationAccept(serverID, serverIP, _) =>
      system.log.info(s"Processing RegistrationAccept: ${rA.toProtoString}")
      complete = true
  }
  
}

PeerState superclass:

abstract case class PeerState(var order: Int, var complete: Boolean, var waitForPing: Boolean) {

  def behavior: Receive // <- must be overridden
  
}

This state causes the peer to sit and wait for a 'RegistrationAccept' message, generated when the user clicks 'Accept' on the webpage.

The Problem:

Some of the states (for example, Handshake state that is responsible for exchanging encryption keys) send http messages to the server that the Peer is generated for. If the instance receiving the message finds a Peer with the requested serverID, the PeerManager will forward that message onto the Peer using ask like so:

case peerRequest@PeerRequest(serverID, SessionRequest(sessionRequest), _) =>
      // get the relevant peer for the requested serverID
      val peerEntry = peerMap.get(serverID)
      if (peerEntry.isDefined) {
        // ask the peer Actor for a response to the sessionRequest, block and wait for this response, then send it back to the peerManager

        val peerState = queryPeerState(peerEntry.get)    // <-- the issue

        if (peerState.isInstanceOf[SessionHandshake]) {  // <-- the issue
          try {
            sender() ! Await.result((peerEntry.get ? (self, sessionRequest)).mapTo[PeerResponse.Response], timeout.duration)
          } catch {
            case e: TimeoutException =>
              sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
          }
        } else {
          sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
        }
      } else {
        sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
      }

The problem occurs when the other instance's Peer is not in the correct state to receive the message. If this happens, I would like to respond with the appropriate HydraStatusCode. The most normal approach to me is to somehow ask the Peer what state it is in before forwarding on the message, and if in the wrong state, return the appropriate StatusCode.

However, to achieve this, I would have to implement QueryState case in every place I have Receive behaviour. This seems clunky and messy and as a skilled engineer does not feel the right way to go about this.

I would like to somehow implement this into the PeerState class as default behaviour that always gets called for a QueryState, or somehow allow the PeerManager to call a reference to Peer object directly instead of to the ActorRef (but I know this isn't possible)

Are there any other approaches to this or is the messy way probably the best?


Solution

  • If I understood your question correctly (?) (I'm not sure because there's quite a lot of context and a lot of code) you want the same handling of QueryState messages for any behavior (of any state) without repeating the related message-handling code.

    You could leverage the fact that Receive is just a PartialFunction and those can be combined with orElse.

    So you could define (in your Peer):

    private def handleQueryState: Receive = {
      case q: QueryState => // handle it here
    }
    

    Then, in your Peer's def receive, instead of doing context.become(state.behavior), you could do context.become(state.behavior.orElse(handleQueryState)).

    I hope this helps.