I have the following actor which sends a request to a WebService:
class VigiaActor extends akka.actor.Actor {
val log = Logging(context.system, this)
context.setReceiveTimeout(5 seconds)
import VigiaActor._
def receive = {
case ObraExists(numero: String, unidadeGestora: String) =>
WS.url(baseURL + s"""/obras/exists/$unidadeGestora/$numero""").withHeaders("Authorization" -> newToken).get.pipeTo(sender)
case ReceiveTimeout =>
val e = TimeOutException("VIGIA: Receive timed out")
throw e
}
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 minute) {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: TimeOutException => Resume
case _: Exception => Restart
}
}
The call to this actor is part of a validation method that should throw an exception in case of a timeout while trying communicate to the WS:
implicit val timeout = Timeout(5 seconds)
lazy val vigiaActor : ActorRef = Akka.system.actorOf(Props[VigiaActor])
(vigiaActor ? VigiaActor.ObraExists(empenho.obra.get, empenho.unidadeGestora)).map {
case r : WSResponse =>
val exists = r.body.toBoolean
if (!exists && empenho.tipoMeta.get.equals(4)) {
erros.adicionarErro(controle.codigoArquivo, row, line, s"Nº de Obra não informado ou inválido para o Tipo de Meta 4 - Obras" , TipoErroImportacaoEnum.WARNING)
}
case _ => erros.adicionarErro(controle.codigoArquivo, row, line, s"Nº de Obra não informado ou inválido para o Tipo de Meta 4 - Obras" , TipoErroImportacaoEnum.WARNING)
}
I am new to this Actor thing, and I am trying to solve some blocking situations on the code.
The problem is I have no Idea of how to "catch" the TimeOutException on the actors call.
UPDATE
switched validation method to:
protected def validateRow(row: Int, line: String, empenho: Empenho, calendarDataEnvioArquivo: Calendar)(implicit s: Session, controle: ControleArquivo, erros:ImportacaoException): Unit = {
implicit val timeout = Timeout(5 seconds)
lazy val vigiaActor : ActorRef = Akka.system.actorOf(Props[VigiaActor])
(vigiaActor ? VigiaActor.ObraExists(empenho.obra.get, empenho.unidadeGestora)).map {
case e: TimeOutException => println("TIMOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOUT!!!!")
case r: WSResponse => {...}
}
}
and the actor ReceiveTimout part to:
case ReceiveTimeout =>
val e = TimeOutException("VIGIA: Receive timed out")
sender ! e
I am getting the following log message as I was before:
[INFO] [07/20/2017 10:28:05.738] [application-akka.actor.default-dispatcher-5] [akka://application/deadLetters] Message [model.exception.TimeOutException] from Actor[akka://application/user/$c#1834419855] to Actor[akka://application/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
context.setReceiveTimeout(5 seconds)
triggers the sending of a ReceiveTimeout
message to VigiaActor
if that actor doesn't receive a message for five seconds. Akka internally sends the ReceiveTimeout
to your actor, which is why in your updated code, trying to send the exception to sender
doesn't do what you expect. In other words, sender
in the case ReceiveTimeout =>
clause is not the original sender of the ObraExists
message.
Setting the receive timeout in VigiaActor
has nothing to do with a WS
request timeout, because no message is sent to VigiaActor
if the request times out. Even if a message was sent to the actor when a WS
request isn't completed in five seconds, another ObraExists
message could have been enqueued in the actor's mailbox in the meantime, thus failing to trigger a ReceiveTimeout
.
In short, setting the actor's receive timeout is not the right mechanism to handle the WS
request timeout. (With your current approach of piping the result of the get
request to the sender, you could adjust the sender to handle a timeout. In fact, I'd forgo the VigiaActor
altogether and simply make the WS
call directly in the validateRow
method, but getting rid of the actor is probably not the point of your question.)
If you must handle a WS
request timeout in the actor, one way to do that is something like the following:
import scala.util.{Failure, Success}
class VigiaActor extends akka.actor.Actor {
import VigiaActor._
val log = Logging(context.system, this)
def receive = {
case ObraExists(numero: String, unidadeGestora: String) =>
val s = sender // capture the original sender
WS.url(baseURL + s"""/obras/exists/$unidadeGestora/$numero""")
.withHeaders("Authorization" -> newToken)
.withRequestTimeout(5 seconds) // set the timeout
.get
.onComplete {
case Success(resp) =>
s ! resp
case Failure(e: scala.concurrent.TimeoutException) =>
s ! TimeOutException("VIGIA: Receive timed out")
case Failure(_) =>
// do something in the case of non-timeout failures
}
}
}