Search code examples
scalaakka

How to handle a Future HttpResponse within an untyped Actor using pipe pattern


In my project, I have to write a rest client which will receive a HttpResponse as a future from a rest service. What I want is to log the status code of the response and in case of any exception, log that exception too. How can I achieve that using pipe pattern. PFB my code snippet:

class MetadataAggregator(implicit config: Config) extends Actor with ActorLogging {

  import context.system
  import akka.pattern.pipe

  implicit val exec = context.system.dispatcher

  val url = sys.env.getOrElse("URL", config.getString("conf.url"))

  override def receive: Receive = {
    case MetadataEvent(event, phase, topic) =>
      val payload = captureMetadata(event, phase, topic)
      publishMetadata(payload)
    case _ => //Do nothing
  }

  
  def publishMetadata(payload: String) = {
    log.info(s"Metadata payload : $payload")
    val responseFuture = Http().singleRequest(
      HttpRequest(
        HttpMethods.POST,
        uri = url,
        entity = HttpEntity(
          ContentTypes.`application/json`,
          payload
        )
      )
    )
  
    responseFuture.pipeTo(self)
  
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case ex: RuntimeException =>
      log.error(s"${self.path} incurred an exception. $ex...")
      ex.printStackTrace()
      log.info("Resuming...")
      Resume
    case any: Any =>
      log.error(s"${self.path} stopping due to $any ...")
      any.printStackTrace()
      Stop
  }
}

Bdw, I can not use akka typed actor since the whole project is using untyped actors.


Solution

  • The pipeTo call is sending the HttpResponse to the actor, so you need to handle that in the receive method. But I would recommend creating a new message that includes the payload as well as the response, and send that to self. This allows you to describe the payload that caused the response.

    The HttpResponse is being caught by case _ => and ignored, so it is generally a good idea to log any unexpected messages so that this sort of thing is caught earlier.


    Example code:

    Create a new class for the result:

     case class PublishResult(payload: String, result: Try[HttpResponse])
    

    In publishMetadata:

     val responseFuture = Http().singleRequest(???)
    
     responseFuture.onComplete{ res =>
       self ! PublishResult(payload, res)
     }
     
    

    In receive add this handler:

     case PublishResult(payload, res) =>
       res match {
         case Failure(e) =>
           log.warn("Request payload {} failed: {}", payload, e.getMessage)
         case Success(response) =>
           log.debug("Request succeeded")
       }