Search code examples
scalawebsocketakkaakka-streamakka-http

Stop Akka stream Source when web socket connection is closed by the client


I have an akka http web socket Route with a code similar to:

private val wsReader: Route = path("v1" / "data" / "ws") { log.info("Opening websocket connecting ...")

  val testSource = Source
    .repeat("Hello")
    .throttle(1, 1.seconds)
    .map(x => {
      println(x)
      x
    })
    .map(TextMessage.Strict)
    .limit(1000)

  extractUpgradeToWebSocket { upgrade ⇒
    complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, testSource))
  }
}

Everything works fine (I receive from the client 1 test message every second). The only problem is that I don't understand how to stop/close the Source (testSource) if the client close the web socket connection.

You can see that the source continue to produce elements (see println) also if the web socket is down.

How can I detect a client disconnection?


Solution

  • handleMessagesWithSinkSource is implemented as:

    /**
     * The high-level interface to create a WebSocket server based on "messages".
     *
     * Returns a response to return in a request handler that will signal the
     * low-level HTTP implementation to upgrade the connection to WebSocket and
     * use the supplied inSink to consume messages received from the client and
     * the supplied outSource to produce message to sent to the client.
     *
     * Optionally, a subprotocol out of the ones requested by the client can be chosen.
     */
    def handleMessagesWithSinkSource(
      inSink:      Graph[SinkShape[Message], Any],
      outSource:   Graph[SourceShape[Message], Any],
      subprotocol: Option[String]                   = None): HttpResponse =
    
      handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol)
    

    This means the sink and the source are independent, and indeed the source should keep producing elements even when the client closes the incoming side of the connection. It should stop when the client resets the connection completely, though.

    To stop producing outgoing data as soon as the incoming connection is closed, you may use Flow.fromSinkAndSourceCoupled, so:

    val socket = upgrade.handleMessages(
      Flow.fromSinkAndSourceCoupled(inSink, outSource)
      subprotocol = None
    )