Search code examples
scalaconnection-poolingakka-http

Akka http connection pooling


I am trying to use client-side connection pooling for our akka http application. However, the requests seem to hang once the maximum number of connections has been reached. I've condensed the problem down to the following:

import java.lang.Thread.UncaughtExceptionHandler
import java.net.ServerSocket
import akka.actor.ActorSystem
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.http.scaladsl.client.RequestBuilding._

import scala.annotation.tailrec
import scala.util.{Success, Try}

object AkkaProblem extends App {
    val server = new ServerSocket(0)
    val serverPort = server.getLocalPort

    object responder extends Runnable with UncaughtExceptionHandler {
       val cr = '\r'
       val httpResponse =
  s"""HTTP/1.1 404 Not Found$cr
      |Content-Type: application/json;charset=UTF-8$cr
      |Date: Mon, 26 Sep 2016 06:30:13 GMT$cr
      |Connection: keep-alive$cr
      |Transfer-Encoding: chunked$cr
      |$cr
      |12$cr
      |{"Hello": "World"}$cr
      |0$cr
      |$cr
      |""".stripMargin

       override final def run(): Unit = {
           val socket = server.accept()
           @tailrec def sendResponse(): Unit = {
               socket.getOutputStream.write(httpResponse.getBytes)
               sendResponse()
           }

           sendResponse()
       }

       override def uncaughtException(t: Thread, e: Throwable): Unit = ()
    }

    for (nr <- 1 to 4) {
        val thread = new Thread(responder, s"response-thread-$nr")
        thread.setUncaughtExceptionHandler(responder)
        thread.setDaemon(true)
        thread.start()
    }


    implicit val system = ActorSystem("main")
    import system.dispatcher
    implicit val mat = ActorMaterializer()

    val serverUri = Uri(s"http://localhost:$serverPort")
    val request = Get(serverUri)

    val poolFlow: Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), Http.HostConnectionPool] =
        Http().newHostConnectionPool(serverUri.authority.host.address, serverUri.authority.port, ConnectionPoolSettings("max-connections: 4"))

   val source = Source.repeat(request).take(1000).map((_, ()))

   val runRequest = source.viaMat(poolFlow)(Keep.right).toMat(Sink.seq)(Keep.both)
   val (connectionPool, response) = runRequest.run()

   response.map(_.map(_._1)).andThen {
   case Success(responses) =>
       val byResultType = responses.groupBy(_.isSuccess).mapValues(_.size)

       println(s"Received response. Got ${byResultType.get(true)} successes, ${byResultType.get(false)} errors")
       connectionPool.shutdown() andThen {
       case done =>
           println("Connection pool shut down")
           system.terminate()
       }
   }
}

I would expect the program to relatively quickly report 1000 successes, and shut down. Instead, it hangs indefinitely. The problem resolves itself when the number of requests is lowered to match the number of allowed connections.

As a workaround, we can use an own pool per connection, yet doing so defeats the purpose of having a pool at all.

The stack dump shows no deadlocks or otherwise obvious misbehaviours:

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1000m; support was removed in 8.0
2016-09-26 13:24:18
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode):

"Attach Listener" #25 daemon prio=9 os_prio=31 tid=0x00007f86bf001000 nid=0x3307 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"main-akka.actor.default-dispatcher-10" #24 prio=5 os_prio=31 tid=0x00007f86bbb4a000 nid=0x6b03 waiting on condition [0x000000011d717000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-9" #23 prio=5 os_prio=31 tid=0x00007f86bc402800 nid=0x6903 waiting on condition [0x000000011d614000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-8" #22 prio=5 os_prio=31 tid=0x00007f86bbb49800 nid=0x6703 waiting on condition [0x000000011d511000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-7" #21 prio=5 os_prio=31 tid=0x00007f86bb292000 nid=0x6503 waiting on condition [0x000000011d40e000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.io.pinned-dispatcher-6" #20 prio=5 os_prio=31 tid=0x00007f86bcbcd000 nid=0x6407 runnable [0x000000011d10b000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
        at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
        at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked  (a sun.nio.ch.Util$2)
        - locked  (a java.util.Collections$UnmodifiableSet)
        - locked  (a sun.nio.ch.KQueueSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
        at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.tryRun(SelectionHandler.scala:115)
        at akka.io.SelectionHandler$ChannelRegistryImpl$Task.run(SelectionHandler.scala:219)
        at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.run(SelectionHandler.scala:148)
        at akka.util.SerializedSuspendableExecutionContext.run$1(SerializedSuspendableExecutionContext.scala:67)
        at akka.util.SerializedSuspendableExecutionContext.run(SerializedSuspendableExecutionContext.scala:71)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

"DestroyJavaVM" #19 prio=5 os_prio=31 tid=0x00007f86bcba2800 nid=0xd03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"main-akka.actor.default-dispatcher-5" #18 prio=5 os_prio=31 tid=0x00007f86bc252800 nid=0x5d03 waiting on condition [0x000000011c488000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-4" #17 prio=5 os_prio=31 tid=0x00007f86bc823800 nid=0x5b03 waiting on condition [0x000000011c185000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-3" #16 prio=5 os_prio=31 tid=0x00007f86bba26000 nid=0x5903 waiting on condition [0x000000011c082000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-akka.actor.default-dispatcher-2" #15 prio=5 os_prio=31 tid=0x00007f86bc256000 nid=0x5703 waiting on condition [0x000000011bf7f000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for   (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"main-scheduler-1" #14 prio=5 os_prio=31 tid=0x00007f86bc248800 nid=0x5503 waiting on condition [0x000000011b8d2000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:87)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:268)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:238)
        at java.lang.Thread.run(Thread.java:745)

"response-thread-4" #13 daemon prio=5 os_prio=31 tid=0x00007f86bc195000 nid=0x5303 runnable [0x000000011b7cf000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45)
        at java.lang.Thread.run(Thread.java:745)

"response-thread-3" #12 daemon prio=5 os_prio=31 tid=0x00007f86bb0fa800 nid=0x5103 runnable [0x000000011b6cc000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45)
        at java.lang.Thread.run(Thread.java:745)

"response-thread-2" #11 daemon prio=5 os_prio=31 tid=0x00007f86bb9ca000 nid=0x4f03 runnable [0x000000011b5c9000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45)
        at java.lang.Thread.run(Thread.java:745)

"response-thread-1" #10 daemon prio=5 os_prio=31 tid=0x00007f86bb9c1000 nid=0x4d03 runnable [0x000000011b4c6000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41)
        at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45)
        at java.lang.Thread.run(Thread.java:745)

"Monitor Ctrl-Break" #9 daemon prio=5 os_prio=31 tid=0x00007f86bb078000 nid=0x4b03 runnable [0x000000011b101000]
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:79)
        at java.lang.Thread.run(Thread.java:745)

"Service Thread" #8 daemon prio=9 os_prio=31 tid=0x00007f86bc810800 nid=0x4703 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007f86bc805800 nid=0x4503 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007f86bb866000 nid=0x4303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007f86bb833800 nid=0x4103 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007f86bb844000 nid=0x3e23 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007f86bc001000 nid=0x2b03 in Object.wait() [0x0000000118c24000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        - locked  (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007f86bb81f000 nid=0x2903 in Object.wait() [0x0000000118b21000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
        - locked  (a java.lang.ref.Reference$Lock)

"VM Thread" os_prio=31 tid=0x00007f86bb014800 nid=0x2703 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007f86bc005000 nid=0x1f03 runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007f86bc005800 nid=0x2103 runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007f86bc006800 nid=0x2303 runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007f86bc007000 nid=0x2503 runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x00007f86bb80c000 nid=0x4903 waiting on condition 

JNI global references: 250

Solution

  • You need to explicitly consume the entity (body) of the HttpResponse from the request. Because the entity of the response is actually a stream, it keeps the connection open if you don't consume it. The documentation details the request response cycle. Either the server must send a Connection: close in the header or you must attach some Sink (e.g. Sink.ignore) to consume the stream.

    In practice, there are a few ways to deal with the HttpResponse. One is to call the toStrict(timeout: FiniteDuration) method of the HttpResponse which will get the whole entity and close the connection. The timeout limits how long the HTTP request will wait for the sender to respond. You can also call discardEntityBytes() method on the HttpResponse if you are not interested in the entity. Finally, you can consume the stream by some effective sink such as Unmarshal(resp.entity).to[SomeClass].