Search code examples
scalaakka-streamakka-http

How to keep my incoming websocket connection open all the time?


I connected to my websocket service using this sample code client, but currently it just connects and then shutsdown.

How can I keep this connection open and never close it?

Once I make a connection, I want it to remain open until I shutdown the application.

package docs.http.scaladsl

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

Github ref: https://github.com/akka/akka-http/blob/v10.2.6/docs/src/test/scala/docs/http/scaladsl/WebSocketClientFlow.scala

Update I have the same code as above, except I updated my sources to look like this:

val source1 = Source.single(TextMessage("""{"action":"auth","params":"APIKEY_123"}"""))
val source2 = Source.single(TextMessage("""{"action":"subscribe","params":"topic123"}"""))

val sources: Source[Message, NotUsed] =
  Source.combine(source1, source2, Source.maybe)(Concat(_))

So I can see my source1, and source2 are sent to the websocket, but the websocket does not start streaming its values as it should, it just hangs.

Not sure what I am doing wrong...


Solution

  • The Akka docs call out your situation:

    The Akka HTTP WebSocket API does not support half-closed connections which means that if either stream completes the entire connection is closed (after a “Closing Handshake” has been exchanged or a timeout of 3 seconds has passed).

    In your case, outgoing (being a Source.single) completes as soon as it has emitted the TextMessage. The webSocketFlow receives the completion message and then tears down the connection.

    The solution is to delay when outgoing completes, perhaps even delaying it forever (or at least until the application is killed).

    Two standard sources are potentially useful for delaying completion in the scenario where you don't want to send messages through the websocket.

    • Source.maybe materializes as a Promise which you can complete with an optional terminating message. It will not complete unless and until the promise is completed.

    • Source.never never completes. You could achieve this by just not completing Source.maybe, but this is less overhead than that.

    So what would it look like in code?

    val outgoing =
      Source.single(TextMessage("hello world!"))
        .concat(Source.never)
    

    For Source.maybe, you'll want .concatMat so that the Promise is available for completion; this does mean that you'll get something like val (completionPromise, upgradeResponse, closed) as the overall materialized value:

    val outgoing =
      Source.single(TextMessage("hello world!"))
        .concatMat(Source.maybe[TextMessage])(Keep.right)
    
    val ((completionPromise, upgradeResponse), closed) =
      outgoing
        .viaMat(websocketFlow)(Keep.both)
        .toMat(incoming)(Keep.both)
        .run()
    

    In the situation where you want to send arbitrarily many messages through the socket, Source.actorRef or Source.queue are handy: send messages to the materialized actor ref to send them through the websocket connection (sending a special message to complete the source) or offer messages to the queue and then complete it.

    val outgoing =
      Source.actorRef[TextMessage](
        completionMatcher = {
          case Done =>
            CompletionStrategy.draining // send the messages already sent before completing
        },
        failureMatcher = PartialFunction.empty,
        bufferSize = 100,
        overflowStrategy = OverflowStrategy.dropNew
      )
    
    val ((sendToSocketRef, upgradeResponse), closed) =
      outgoing
        .viaMat(websocketFlow)(Keep.both)
        .toMat(incoming)(Keep.both)
        .run()
    
    sendToSocketRef ! TextMessage("hello world!")