Search code examples
scalaakkaakka-stream

How to keep the connection open to websocket server?


I have the following code, that does not keep the connection open to the websocket server:


import akka.Done
import akka.actor.{Actor, ActorLogging, Props}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.concurrent._
import scala.util.{Failure, Success}

object WsActor {
  def props: Props = Props(new WsActor)
}

final class WsActor extends Actor with ActorLogging {

  import com.sweetsoft.WsConnector._

  implicit val materializer: Materializer = ActorMaterializer()
  implicit val ec: ExecutionContextExecutor = context.system.dispatcher
  implicit val actor = context.system

  // Future[Done] is the materialized value of Sink.foreach,
  // emitted when the stream completes
  private val incoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println(message.text)
    case _ =>
      println("Unknown messages.")
  }

  //private val outgoing: Source[Message, Promise[Option[Message]]] =
  //  Source.maybe[Message]

  //  val flow: Flow[Message, Message, Promise[Option[Message]]] =
  //    Flow.fromSinkAndSourceMat(incoming, Source.maybe[Message])(Keep.right)


  log.info("Websocket actor started.")

  override def receive: Receive = {
    case Initialized =>
      log.info("Initialization to receive messages via stream.")
      sender() ! Ack
    case Completed =>
      log.info("Streams completed.")
      sender() ! Ack
    case Msg(value) =>

      val replyTo = sender()
      val flow: Flow[Message, Message, Promise[Option[Message]]] =
        Flow.fromSinkAndSourceMat(incoming, Source.single(TextMessage(value)).concatMat(Source.maybe[Message])(Keep.right))(Keep.right)

      val (upgradeResponse, _) =
        Http().singleWebSocketRequest(WebSocketRequest("ws://127.0.0.1:7000/ws"), flow.mapAsync(4)(msg => Future(msg)))

      upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
          Future.successful(Done)
        } else {
          throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
      }.onComplete {
        case Success(_) =>
          replyTo ! Ack
          log.info("Done")
        case Failure(ex) => log.error(ex.getMessage)
      }

    case Failed(ex) =>
      log.info(s"Stream failed with ${ex.getMessage}.")
  }

}

So every time, when a message is received, it will close the connection and open a new connection for the next request.
The question is, how can I keep the connection open?


Solution

  • Http().webSocketClientFlow in stead of Http().singleWebSocketRequest

    Http().webSocketClientFlow will give you a Flow Flow[Message, Message, Future[WebSocketUpgradeResponse]]

    This will not create a new connection every time.

    You should declare it in the companion object, so every instance of the class can use the same connection.

    First declare your actor system for entire application in a separate package.

    object ActorEssentials {
            implicit val actorSystem = ActorSystem("test")
    }
    

    Then you can declare the following

    object WsActor {
      import ActorEssentials._
      def props: Props = Props[WsActor]
      val flow = Http()(actorSystem).webSocketClientFlow(...) 
    }