I have the following code, that does not keep the connection open to the websocket server:
import akka.Done
import akka.actor.{Actor, ActorLogging, Props}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import scala.concurrent._
import scala.util.{Failure, Success}
object WsActor {
def props: Props = Props(new WsActor)
}
final class WsActor extends Actor with ActorLogging {
import com.sweetsoft.WsConnector._
implicit val materializer: Materializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = context.system.dispatcher
implicit val actor = context.system
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
private val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
println("Unknown messages.")
}
//private val outgoing: Source[Message, Promise[Option[Message]]] =
// Source.maybe[Message]
// val flow: Flow[Message, Message, Promise[Option[Message]]] =
// Flow.fromSinkAndSourceMat(incoming, Source.maybe[Message])(Keep.right)
log.info("Websocket actor started.")
override def receive: Receive = {
case Initialized =>
log.info("Initialization to receive messages via stream.")
sender() ! Ack
case Completed =>
log.info("Streams completed.")
sender() ! Ack
case Msg(value) =>
val replyTo = sender()
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(incoming, Source.single(TextMessage(value)).concatMat(Source.maybe[Message])(Keep.right))(Keep.right)
val (upgradeResponse, _) =
Http().singleWebSocketRequest(WebSocketRequest("ws://127.0.0.1:7000/ws"), flow.mapAsync(4)(msg => Future(msg)))
upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}.onComplete {
case Success(_) =>
replyTo ! Ack
log.info("Done")
case Failure(ex) => log.error(ex.getMessage)
}
case Failed(ex) =>
log.info(s"Stream failed with ${ex.getMessage}.")
}
}
So every time, when a message is received, it will close the connection and open a new connection for the next request.
The question is, how can I keep the connection open?
Http().webSocketClientFlow
in stead of Http().singleWebSocketRequest
Http().webSocketClientFlow
will give you a Flow Flow[Message, Message, Future[WebSocketUpgradeResponse]]
This will not create a new connection every time.
You should declare it in the companion object, so every instance of the class can use the same connection.
First declare your actor system for entire application in a separate package.
object ActorEssentials {
implicit val actorSystem = ActorSystem("test")
}
Then you can declare the following
object WsActor {
import ActorEssentials._
def props: Props = Props[WsActor]
val flow = Http()(actorSystem).webSocketClientFlow(...)
}