Background:
I have an Actor called Peer
. Within my application, a Peer
represents a connection between the server that created it and another server. It is essentially a http-client
.
When the instance is first ran, the PeerManager
queries the database for the list of server entries. It then creates a Peer
actor for that server. The state of the Peer
depends on its RegistrationStatus
field (PENDING_OUT, PENDING_IN, REGISTERED, ...)
A Peer
has a default receive function that looks like this:
override def receive: Receive = {
case FirstTick =>
system.log.info("receive FirstTick")
timers.startTimerWithFixedDelay(TickKey, Tick, FiniteDuration(2, TimeUnit.SECONDS))
case Tick =>
system.log.info("receive Tick")
// process every state until all are complete
var statesLeft = false
states = states.sortWith{ (s1, s2) => s1.order > s2.order }
for (state <- states) {
if (!state.complete) {
statesLeft = true
if (state.waitForPing) {
context.become(pingService(state))
} else {
context.become(state.behavior)
}
}
}
if (!statesLeft) context.become(pingService(null))
}
This basically allows a Peer
to transition through a list of states, executing each state's behaviour until the state is complete.
A set of state might look like this:
case PENDING_OUT =>
Seq(
new InitState(0, peer),
new SessionHandshake(1, peer),
new WaitMessages(2, peer)
)
An example of one of the states:
class WaitAccept(order: Int, peer: Peer)(implicit system: ActorSystem) extends PeerState(order, false, true) {
override def behavior: Receive = {
case Tick =>
system.log.info("Waiting for accept")
if (complete) peer.getContext.unbecome()
case rA@RegistrationAccept(serverID, serverIP, _) =>
system.log.info(s"Processing RegistrationAccept: ${rA.toProtoString}")
complete = true
}
}
PeerState
superclass:
abstract case class PeerState(var order: Int, var complete: Boolean, var waitForPing: Boolean) {
def behavior: Receive // <- must be overridden
}
This state causes the peer to sit and wait for a 'RegistrationAccept' message, generated when the user clicks 'Accept' on the webpage.
The Problem:
Some of the states (for example, Handshake
state that is responsible for exchanging encryption keys) send http
messages to the server
that the Peer
is generated for. If the instance receiving the message finds a Peer
with the requested serverID
, the PeerManager
will forward that message onto the Peer
using ask
like so:
case peerRequest@PeerRequest(serverID, SessionRequest(sessionRequest), _) =>
// get the relevant peer for the requested serverID
val peerEntry = peerMap.get(serverID)
if (peerEntry.isDefined) {
// ask the peer Actor for a response to the sessionRequest, block and wait for this response, then send it back to the peerManager
val peerState = queryPeerState(peerEntry.get) // <-- the issue
if (peerState.isInstanceOf[SessionHandshake]) { // <-- the issue
try {
sender() ! Await.result((peerEntry.get ? (self, sessionRequest)).mapTo[PeerResponse.Response], timeout.duration)
} catch {
case e: TimeoutException =>
sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
}
} else {
sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
} else {
sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
The problem occurs when the other instance's Peer
is not in the correct state to receive the message. If this happens, I would like to respond with the appropriate HydraStatusCode
. The most normal approach to me is to somehow ask the Peer
what state it is in before forwarding on the message, and if in the wrong state, return the appropriate StatusCode.
However, to achieve this, I would have to implement QueryState
case in every place I have Receive
behaviour. This seems clunky and messy and as a skilled engineer does not feel the right way to go about this.
I would like to somehow implement this into the PeerState
class as default behaviour that always gets called for a QueryState
, or somehow allow the PeerManager
to call a reference to Peer
object directly instead of to the ActorRef (but I know this isn't possible)
Are there any other approaches to this or is the messy way probably the best?
If I understood your question correctly (?) (I'm not sure because there's quite a lot of context and a lot of code) you want the same handling of QueryState
messages for any behavior (of any state) without repeating the related message-handling code.
You could leverage the fact that Receive
is just a PartialFunction
and those can be combined with orElse
.
So you could define (in your Peer
):
private def handleQueryState: Receive = {
case q: QueryState => // handle it here
}
Then, in your Peer
's def receive
, instead of doing context.become(state.behavior)
, you could do context.become(state.behavior.orElse(handleQueryState))
.
I hope this helps.