Search code examples
akkaakka-streamakka-http

Akka stream Http singleRequest blocks whole stream while waiting for response


I am trying to integrate Akka Http into my Akka stream but in some rare cases, the stream is getting stuck.

  implicit val system: ActorSystem = ActorSystem("actor-system")

  Source(0 to 10)
    .mapAsync(10)(i ⇒ {
      val url =
        if (i == 1) "http://run.mocky.io/v3/40ff086f-1389-4ca5-ace8-1f0b3ac75582?mocky-delay=10s"
        else "http://google.com"

      Http().singleRequest(HttpRequest(uri = url))
    })
    .runForeach(r ⇒ println(s"${System.currentTimeMillis()}: ${r._1}"))

This code will be stuck for 10 seconds after the first output 1599480226827: 301 Moved Permanently and then will flush the rest at the literally same time.

And the output would be:

1599480226827: 301 Moved Permanently
1599480236826: 302 Found
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently

I expect it to output everything in order, except for the delayed one.

Why is my stream blocked by such a request? And how to avoid it?


Solution

  • From scaladocs of mapAsync

    The number of Futures that shall run in parallel is given as the first argument to mapAsync. These Futures may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.

    Your requests are sent in parallel but the function under runForeach is called in the specific order causing this delay in outputting the results. It's waiting for the second response to be available.

    You can use mapAsyncUnordered to process responses as soon as they are available.