Using akka stream and akka HTTP, I have created a stream which polls an API every 3 seconds, Unmarshalls the result to a JsValue object and sends this result to an actor. As can be seen in the following code:
// Source which performs an http request every 3 seconds.
val source = Source.tick(0.seconds,
3.seconds,
HttpRequest(uri = Uri(path = Path("/posts/1"))))
// Processes the result of the http request
val flow = Http().outgoingConnectionHttps("jsonplaceholder.typicode.com").mapAsync(1) {
// Able to reach the API.
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Unmarshal the json response.
Unmarshal(entity).to[JsValue]
// Failed to reach the API.
case HttpResponse(code, _, entity, _) =>
entity.discardBytes()
Future.successful(code.toString())
}
// Run stream
source.via(flow).runWith(Sink.actorRef[Any](processJsonActor,akka.actor.Status.Success(("Completed stream"))))
This works, however the stream closes after 100 HttpRequests (ticks).
What is the cause of this behaviour?
Definitely something to do with outgoingConnectionHttps
. This is a low level DSL and there could be some misconfigured setting somewhere which is causing this (although I couldn't figure out which one).
Usage of this DSL is actually discouraged by the docs.
Try using a higher level DSL like cached connection pool
val flow = Http().cachedHostConnectionPoolHttps[NotUsed]("akka.io").mapAsync(1) {
// Able to reach the API.
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
// Unmarshal the json response.
Unmarshal(entity).to[String]
// Failed to reach the API.
case (Success(HttpResponse(code, _, entity, _)), _) =>
entity.discardBytes()
Future.successful(code.toString())
case (Failure(e), _) ⇒
throw e
}
// Run stream
source.map(_ → NotUsed).via(flow).runWith(...)