Search code examples
scalaakkaakka-streamakka-http

How to time how long it takes for Akka HTTP to complete HTTP request


I have built a flow which takes a case class (Event) and sends it to a HTTP endpoint and returns it back. It's implemented like so:

Flow[Event]
    .mapAsync(16)(eventToHttpRequest)
    .via(connection)
    .map(handleResponse)

For reference, here is the handleResponse method:

def handleResponse(endpoint: String)(responseTuple: (Try[HttpResponse], Event))(implicit actorSystem: ActorSystem, mat: ActorMaterializer) = {
    responseTuple match {
      case (Success(response), event) =>
        response.status.intValue() match {
          case code if code >= 500 =>
            val message = s"Server side error sending event with id ${event.id} to ingestion gateway, status : ${response.status}"
            LOG.error(message)
            response.discardEntityBytes()
            throw new UnexpectedException(message)
          case code if (code >= 400) && (code < 500) =>           
            val message = s"Bad request sending  event with id ${event.id} to ingestion gateway, status : ${response.status}"
            LOG.error(message)
            throw new UnexpectedException(message)
          case _ =>
            LOG.debug(s"Sent event with id ${event.id}, status : ${response.status}")
            response.discardEntityBytes()
            event
        }
      case (Failure(ex), justDataEvent) =>
        LOG.error(s"Could not connect to $endpoint")
        throw ex
    }
  }

I would like to monitor how long the HTTP request takes. "How long a request takes" could be thought of as:

  1. How long until we get back the initial headers and status code
  2. How long until we get the entire body in memory

In this case they will be very similar, as the response is small, but it would be good to know how to compute both.


Solution

  • For the request response cycle this can be implemented with an intermediate flow that adds a start time to the http request and event:

    type EventAndTime = Tuple2[Event, Long]
    
    val addQueryTime : Tuple2[HttpRequest, Event] => Tuple2[HttpRequest, EventAndTime] = 
      (tup) => (tup._1, (tup._2, java.lang.System.currentTimeMillis()))
    
    val addQueryTimeFlow : Flow[(HttpRequest, Event), (HttpRequest, EventAndTime),_] = 
      Flow[(HttpRequest, Event)] map addQueryTime
    

    Now handleRequest will receive the Event and the system time after going through the conn:

    Flow[Event]
      .mapAsync(16)(eventToHttpRequest)
      .via(addQueryTimeFlow)
      .via(connection)
      .map(handleResponse)
    

    handleRequest can just ask for the system time again and do a diff.

    You can do a similar trick with response.entity to time how long that takes:

    val timeout : FiniteDuration = ???
    
    case class EntityAndTime(strict : Strict, startTime : Long, endTime : Long)
    
    val entity = response.entity
    
    val entityAndTime : Future[EntityAndTime] = 
      Future(System.currentTimeMillis())
      .flatMap { startTime => 
        entity
          .toStrict(timeout)
          .map { strict =>
            EntityAndTime(strict, startTime, System.currentTimeMillis())
          }
      }