I am sending different messages to the actor via ask
. On timeout I'd like to provide a default value which is different for messages being asked to the actor.
Since the Timeout Exception is always the same, I can not use it in recover
to return different default values I need the original message being sent.
How can one achieve that.
Code example:
val storageActorProxy = Flow[ByteString]
.via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
.map(TCPMessage.decode)
.ask[OperationResponse](storageActor)
//TODO: looking for this recover; non-existent AFAIK
.customRecover {
case Op1 => DefaultResponseA()
case Op2 => DefaultResponseB()
}
.map(TCPMessage.encode(_).toByteString)
Akka's ask
method is actually pretty easy to recreate - it is just a mapAsync
with some extra logic for better errors when the actor dies (see the code). As such, just use mapAsync
manually so you can recover the ask error.
val storageActorProxy = Flow[ByteString]
.via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
.map(TCPMessage.decode)
.mapAsync(parallelism = 2) { decodedMessage =>
(storageActor ? decodedMessage).recover {
case Op1 => DefaultResponseA()
case Op2 => DefaultResponseB()
}
}
.map(TCPMessage.encode(_).toByteString)