Search code examples
scalawebsocketakkaakka-streamakka-http

How to get a kill switch per user for Akka Http Websocket connection?


I'm new to Akka and Scala and self learning this to do a small project with websockets. End goal is simple, make a basic chat server that publishes + subscribes messages on some webpage.

In fact, after perusing their docs, I already found the pages that are relevant to my goal, namely this and this.

Using dynamic junctions (aka MergeHub & BroadcastHub), and the Flow.fromSinkAndSource() method, I was able to acheive a very basic example of what I wanted. We can even get a kill switch using the example from the akka docs which I have shown below. Code is like:

private lazy val connHub: Flow[Message, Message, UniqueKillSwitch] = {
    val (sink, source) = MergeHub.source[Message].toMat(BroadcastHub.sink[Message])(Keep.both).run()
    Flow.fromSinkAndSourceCoupled(sink, source).joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
  }

However, I now see one issue. The above will return a Flow that will be used by Akka's websocket directive: akka.http.scaladsl.server.Directives.handleWebSocketMessages(/* FLOW GOES HERE */) That means the akka code itself will materialize this flow for me so long as I provide it the handler.

But let's say I wanted to arbitrarily kill one user's connection through a KillSwitch (maybe because their session has expired on my application). While a user's websocket would be added through the above handler, since my code would not be explicitly materializing that flow, I won't get access to a KillSwitch. Therefore, I can't kill the connection, only the user can when they leave the webpage.

It's strange to me that the docs would mention the kill switch method without showing how I would get one using the websocket api.

Can anyone suggest a solution as to how I could obtain the kill switch per connection? Do I have a fundamental misunderstanding of how this should work?

Thanks in advance.


Solution

  • I'm very happy to say that after a lot of time, research, and coding, I have an answer for this question. In order to do this, I had to post in the Akka Gitter as well as the Lightbend discussion forum. Please refer to the amazing answer I got there for some perspective on the problem and some solutions. I'll summarize that here.

    In order to get the UniqueKillSwitch from the code that I was using, I needed to use the mapMaterializeValue() method on the Flow that I was returning. Here is the code that I'm using to return a Flow to the handleWebSocketMessages directive now:

    // note - state will not be updated properly if cancellation events come through from the client side as user->killswitch mapping may still remain in concurrent map even if the connections are closed
        Flow.fromSinkAndSourceCoupled(mergeHubSink, broadcastHubSource)
          .joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
          .mapMaterializedValue { killSwitch =>
            connections.put(user, killSwitch) // add kill switch in side effect once value is ready from materialization
            NotUsed.notUsed()
          }
    

    The above code lives in a Chatroom class I've created that has access to the mergehub and broadcast hub materialized sink and source. It also has access to a concurrent hashmap that persists the kill switch to a user. In this way, we now have access to the Kill Switch through querying it through a map. From there, you can call switch.shutdown() to kill the user's connection from the server side.

    My main issue was that I originally thought I could get the switch directly even though I didn't control the materialization. This doesn't seem possible. I suggest this method for when you know that the caller that requires your Flow doesn't care about the materialized value (aka the kill switch).

    Please reference the answer I've linked for more scenarios and ways to handle this problem.