Search code examples
scalaweb-servicesplayframeworktimeoutakka

How to handle timeout from a WS call inside an Akka Actor


I have the following actor which sends a request to a WebService:

class VigiaActor extends akka.actor.Actor {
  val log = Logging(context.system, this)

  context.setReceiveTimeout(5 seconds)

  import VigiaActor._
  def receive = {
    case ObraExists(numero: String, unidadeGestora: String) =>
      WS.url(baseURL + s"""/obras/exists/$unidadeGestora/$numero""").withHeaders("Authorization" -> newToken).get.pipeTo(sender)
    case ReceiveTimeout =>
      val e = TimeOutException("VIGIA: Receive timed out")
      throw e
  }

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: TimeOutException         => Resume       
      case _: Exception                => Restart
    }
}

The call to this actor is part of a validation method that should throw an exception in case of a timeout while trying communicate to the WS:

implicit val timeout = Timeout(5 seconds)
lazy val vigiaActor : ActorRef = Akka.system.actorOf(Props[VigiaActor])

(vigiaActor ? VigiaActor.ObraExists(empenho.obra.get, empenho.unidadeGestora)).map {
  case r : WSResponse =>
    val exists = r.body.toBoolean

    if (!exists && empenho.tipoMeta.get.equals(4)) {
      erros.adicionarErro(controle.codigoArquivo, row, line, s"Nº de Obra não informado ou inválido para o Tipo de Meta 4 - Obras" , TipoErroImportacaoEnum.WARNING)
    }

  case _ => erros.adicionarErro(controle.codigoArquivo, row, line, s"Nº de Obra não informado ou inválido para o Tipo de Meta 4 - Obras" , TipoErroImportacaoEnum.WARNING)
}

I am new to this Actor thing, and I am trying to solve some blocking situations on the code.

The problem is I have no Idea of how to "catch" the TimeOutException on the actors call.

UPDATE

switched validation method to:

protected def validateRow(row: Int, line: String, empenho: Empenho, calendarDataEnvioArquivo: Calendar)(implicit s: Session, controle: ControleArquivo, erros:ImportacaoException): Unit = {
    implicit val timeout = Timeout(5 seconds)
    lazy val vigiaActor : ActorRef = Akka.system.actorOf(Props[VigiaActor])

    (vigiaActor ? VigiaActor.ObraExists(empenho.obra.get, empenho.unidadeGestora)).map {
      case e: TimeOutException => println("TIMOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOUT!!!!")
      case r: WSResponse => {...}
    }
}

and the actor ReceiveTimout part to:

case ReceiveTimeout =>
  val e = TimeOutException("VIGIA: Receive timed out")
  sender ! e

I am getting the following log message as I was before:

[INFO] [07/20/2017 10:28:05.738] [application-akka.actor.default-dispatcher-5] [akka://application/deadLetters] Message [model.exception.TimeOutException] from Actor[akka://application/user/$c#1834419855] to Actor[akka://application/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.


Solution

  • context.setReceiveTimeout(5 seconds) triggers the sending of a ReceiveTimeout message to VigiaActor if that actor doesn't receive a message for five seconds. Akka internally sends the ReceiveTimeout to your actor, which is why in your updated code, trying to send the exception to sender doesn't do what you expect. In other words, sender in the case ReceiveTimeout => clause is not the original sender of the ObraExists message.

    Setting the receive timeout in VigiaActor has nothing to do with a WS request timeout, because no message is sent to VigiaActor if the request times out. Even if a message was sent to the actor when a WS request isn't completed in five seconds, another ObraExists message could have been enqueued in the actor's mailbox in the meantime, thus failing to trigger a ReceiveTimeout.

    In short, setting the actor's receive timeout is not the right mechanism to handle the WS request timeout. (With your current approach of piping the result of the get request to the sender, you could adjust the sender to handle a timeout. In fact, I'd forgo the VigiaActor altogether and simply make the WS call directly in the validateRow method, but getting rid of the actor is probably not the point of your question.)

    If you must handle a WS request timeout in the actor, one way to do that is something like the following:

    import scala.util.{Failure, Success}
    
    class VigiaActor extends akka.actor.Actor {
      import VigiaActor._
      val log = Logging(context.system, this)
    
      def receive = {
        case ObraExists(numero: String, unidadeGestora: String) =>
          val s = sender // capture the original sender
          WS.url(baseURL + s"""/obras/exists/$unidadeGestora/$numero""")
            .withHeaders("Authorization" -> newToken)
            .withRequestTimeout(5 seconds) // set the timeout
            .get
            .onComplete {
              case Success(resp) =>
                s ! resp
              case Failure(e: scala.concurrent.TimeoutException) =>
                s ! TimeOutException("VIGIA: Receive timed out")
              case Failure(_) =>
                // do something in the case of non-timeout failures
            }
      }
    }