Search code examples
akkaakka-typed

Akka ask pattern: how to make request and replies context-aware?


For a University assignment, I have to implement a simulation of the Raft protocol in Akka (I am using Akka typed, using Behaviors). In the Raft protocol, interactions between actors have a 1:1 mapping between a request and a response; responses must be delivered in a timely manner. Therefore, it makes sense to use the ask pattern as demonstrated by the documentation in the Request-Response with ask between two actors example.

In my implementation, requests and responses must be context-aware: this means that, when an actor that performed a query receives a response, it must know what query the response was for. The example in the documentation suggests to include a query ID in the message.

What I need to solve can be described with the following example:

  • Actor A sends a query with ID=1 to actor B (it includes the query ID in the message).
  • B does not reply in time (the network, or B itself, may be slow), thus A re-issues a query with ID=2 to B.
  • Actor B receives the query with ID=1, and replies to actor A (including the query ID in the message).
  • Actor A receives B's reply with ID=1. A knows that the last query it sent had ID=2, thus must NOT process the reply but wait for the one with ID=2.

I think that, to "filter" replies that do not have a correct query ID, I can put a BehaviorInterceptor in actor A that checks that the ID in the reply matches the expected query ID.

To summarize:

  • Actor A writes in a hashmap the query ID to be expected from actor B's next reply,
  • The interceptor uses this hashmap to check the ID in the reply. Is this a good design?

Moreover, I don't understand whether ask is blocking or not. Ideally, I would like to use ask in a non-blocking way: actor A asks actor B, and, while waiting for B's reply, A can do other operations. While waiting for B's reply, actor A can also change its behavior if needed (also a Behavior that does not handle B's replies).

Thank you for any insight!


Solution

  • An ask between two actors (using the ActorContext) is non-blocking.

    Since the high watermark of the requests to a given target is an important part of protocol state for the actor, I would just store it in the asking actor's state (e.g. in Scala a Map[ActorRef[Request], Int]). The adapted response contains the target and the id it's in response to (you define how this is incorporated when performing the ask); when receiving the adapted response, the first thing is comparing the id in the response to the high watermark for the target.

    In Scala, for example:

    sealed trait RequestA
    case class QueryB(target: ActorRef[RequestB]) extends RequestA
    case class ResponseFromB(target: ActorRef[RequestB], id: Int, resp: ResponseB) extends RequestA
    case class BTimedOut(target: ActorRef[RequestB], id: Int) extends RequestA
    
    sealed trait RequestB
    
    def buildRequestB(id: Int)(replyTo: ResponseB): RequestB = ???
    
    sealed trait ResponseB
    
    def aBehavior(highWater: Map[ActorRef[RequestB], Int]): Behavior[RequestA] =
      Behaviors.receive { (context, msg) =>
        case QueryB(target) =>
          implicit val timeout: Timeout = 10.seconds
    
          val nextHighwater = highWater.get(target).map(_ + 1).getOrElse(0)
    
          // request is sent and received "in the background"
          context.ask(target, buildRequestB(nextHighwater)) {
            case Success(resp) => ResponseFromB(target, nextHighwater, resp)
            case Failure(_) => BTimedOut(target, nextHighwater)
          }
    
          aBehavior(highWater + (target -> nextHighwater))
    
        case ResponseFromB(target, id, resp) =>
          if (highWater.get(target).contains(id)) {
            context.log.info("Accepting response: {}", resp)
            Behaviors.same
          } else {
            context.log.info("Ignoring response: {}", resp)
            Behaviors.same
          }
    
        case BTimedOut(target, id) =>
          context.log.warning("Ask of {} (sequence ID {}) timed out", target, id)
          Behaviors.same
      }