Search code examples
scalawebsocketakkaakka-http

Akka websocket client : actively disconnect from server and / or replace sink


I started to learn Akka and came across a challenge for which I can't find an easy solution, despite having waded through the documentation and related Stakoverflow questions:

Building on the Client-Side Websocket Support example on the Akka website, I am using as the basis the following code snippet in Scala:

  val flow: Flow[Message, Message, Future[Done]] =
  Flow.fromSinkAndSourceMat(printSink, Source.maybe)(Keep.left)

  val (upgradeResponse, closed) =
  Http().singleWebSocketRequest(WebSocketRequest("ws://localhost/ws"), flow)

The use case I have is a client (printSink) consuming a continuous stream from the websocket server. The communication uni-directional only, thus no need for a source.

My question is then as follows:

  1. I need to regularly force a re-connection to the websocket server, and for that I need to disconnect first. But for the life of me, I can't find a way to do a simple disconnect
  2. In a somewhat opposite scenario, I need to keep the websocket connection alive and "swap out" the sink. Is this even possible, i.e. without creating another websocket connection?

Solution

  • For question 1 (forcing a disconnect from the client), this should work

    val flow: Flow[Message, Message, (Future[Done], Promise[Option[Message])] =
      Flow.fromSinkAndSourceMat(
        printSink,
        Source.maybe
      )(Keep.both)
    
    val (upgradeResponse, (closed, disconnect)) =
      Http().singleWebsocketRequest(WebSocketRequest("ws://localhost/ws"), flow)
    

    disconnect can then be completed with a None to disconnect:

    disconnect.success(None)
    

    For question 2, my intuition is that that sort of dynamic stream operation would seem to require a custom stream operator (i.e. one level below the Graph DSL and two levels below the "normal" scaladsl/javadsl). I don't have a huge amount of direct experience there, to be honest.