Search code examples
scalaakkafutureactorakka-stream

Recover on Akka ask based on the message sent


I am sending different messages to the actor via ask. On timeout I'd like to provide a default value which is different for messages being asked to the actor.

Since the Timeout Exception is always the same, I can not use it in recover to return different default values I need the original message being sent.

How can one achieve that.

Code example:

      val storageActorProxy = Flow[ByteString]
        .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
        .map(TCPMessage.decode)
        .ask[OperationResponse](storageActor)
        //TODO: looking for this recover; non-existent AFAIK
        .customRecover { 
            case Op1 => DefaultResponseA()
            case Op2 => DefaultResponseB()
        }
        .map(TCPMessage.encode(_).toByteString)


Solution

  • Akka's ask method is actually pretty easy to recreate - it is just a mapAsync with some extra logic for better errors when the actor dies (see the code). As such, just use mapAsync manually so you can recover the ask error.

    val storageActorProxy = Flow[ByteString]
      .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
      .map(TCPMessage.decode)
      .mapAsync(parallelism = 2) { decodedMessage =>
         (storageActor ? decodedMessage).recover {
           case Op1 => DefaultResponseA()
           case Op2 => DefaultResponseB()
         }
      }
      .map(TCPMessage.encode(_).toByteString)