I have built a flow which takes a case class (Event
) and sends it to a HTTP endpoint and returns it back. It's implemented like so:
Flow[Event]
.mapAsync(16)(eventToHttpRequest)
.via(connection)
.map(handleResponse)
For reference, here is the handleResponse
method:
def handleResponse(endpoint: String)(responseTuple: (Try[HttpResponse], Event))(implicit actorSystem: ActorSystem, mat: ActorMaterializer) = {
responseTuple match {
case (Success(response), event) =>
response.status.intValue() match {
case code if code >= 500 =>
val message = s"Server side error sending event with id ${event.id} to ingestion gateway, status : ${response.status}"
LOG.error(message)
response.discardEntityBytes()
throw new UnexpectedException(message)
case code if (code >= 400) && (code < 500) =>
val message = s"Bad request sending event with id ${event.id} to ingestion gateway, status : ${response.status}"
LOG.error(message)
throw new UnexpectedException(message)
case _ =>
LOG.debug(s"Sent event with id ${event.id}, status : ${response.status}")
response.discardEntityBytes()
event
}
case (Failure(ex), justDataEvent) =>
LOG.error(s"Could not connect to $endpoint")
throw ex
}
}
I would like to monitor how long the HTTP request takes. "How long a request takes" could be thought of as:
In this case they will be very similar, as the response is small, but it would be good to know how to compute both.
For the request response cycle this can be implemented with an intermediate flow that adds a start time to the http request and event:
type EventAndTime = Tuple2[Event, Long]
val addQueryTime : Tuple2[HttpRequest, Event] => Tuple2[HttpRequest, EventAndTime] =
(tup) => (tup._1, (tup._2, java.lang.System.currentTimeMillis()))
val addQueryTimeFlow : Flow[(HttpRequest, Event), (HttpRequest, EventAndTime),_] =
Flow[(HttpRequest, Event)] map addQueryTime
Now handleRequest
will receive the Event
and the system time after going through the conn
:
Flow[Event]
.mapAsync(16)(eventToHttpRequest)
.via(addQueryTimeFlow)
.via(connection)
.map(handleResponse)
handleRequest
can just ask for the system time again and do a diff.
You can do a similar trick with response.entity
to time how long that takes:
val timeout : FiniteDuration = ???
case class EntityAndTime(strict : Strict, startTime : Long, endTime : Long)
val entity = response.entity
val entityAndTime : Future[EntityAndTime] =
Future(System.currentTimeMillis())
.flatMap { startTime =>
entity
.toStrict(timeout)
.map { strict =>
EntityAndTime(strict, startTime, System.currentTimeMillis())
}
}