Search code examples
scalaakkaakka-http

Why does Source.tick stop after one hundred HttpRequests?


Using akka stream and akka HTTP, I have created a stream which polls an API every 3 seconds, Unmarshalls the result to a JsValue object and sends this result to an actor. As can be seen in the following code:

// Source which performs an http request every 3 seconds.
val source = Source.tick(0.seconds,
                         3.seconds,
                         HttpRequest(uri = Uri(path = Path("/posts/1"))))

// Processes the result of the http request
val flow = Http().outgoingConnectionHttps("jsonplaceholder.typicode.com").mapAsync(1) {
  
  // Able to reach the API.
  case HttpResponse(StatusCodes.OK, _, entity, _) =>
    // Unmarshal the json response.
    Unmarshal(entity).to[JsValue]

  // Failed to reach the API.
  case HttpResponse(code, _, entity, _) =>
    entity.discardBytes()
    Future.successful(code.toString())


}

// Run stream
source.via(flow).runWith(Sink.actorRef[Any](processJsonActor,akka.actor.Status.Success(("Completed stream"))))

This works, however the stream closes after 100 HttpRequests (ticks).

What is the cause of this behaviour?


Solution

  • Definitely something to do with outgoingConnectionHttps. This is a low level DSL and there could be some misconfigured setting somewhere which is causing this (although I couldn't figure out which one).

    Usage of this DSL is actually discouraged by the docs.

    Try using a higher level DSL like cached connection pool

      val flow = Http().cachedHostConnectionPoolHttps[NotUsed]("akka.io").mapAsync(1) {
    
        // Able to reach the API.
        case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
          // Unmarshal the json response.
          Unmarshal(entity).to[String]
    
        // Failed to reach the API.
        case (Success(HttpResponse(code, _, entity, _)), _) =>
          entity.discardBytes()
          Future.successful(code.toString())
    
        case (Failure(e), _) ⇒
          throw e
      }
    
      // Run stream
      source.map(_ → NotUsed).via(flow).runWith(...)