Search code examples
scalawebsocketplayframeworkakka-http

Migration from Play Websocket to Akka HTTP Websocket


I am migrating from Play to Akka HTTP. I have jar dependency code which I am not able to change which accepts a

Flow[Array[Byte],Array[Byte],Any] 

which is what is provided by Play for a WebSocket connection. In Akka HTTP the definition is

Flow[Message,Message,Any] 

I need a translation between the 2 definitions. I am new to Akka http so I am not exactly sure how to proceed. In play I was also using ActorFlow.actorRef

handleWebSocketMessages(wsFlow)

def wsFlow: Flow[Message, Message, Any] = {
 ActorFlow.actorRef(websocket => MyBridgeActor.props(websocket))
}

ActorFlow code only has a dependency on akka so I have just copied the file to my own code base. https://github.com/playframework/playframework/blob/master/framework/src/play-streams/src/main/scala/play/api/libs/streams/ActorFlow.scala

I guess a solution would be to create a CustomActorFlow which will includes the conversion from Message to Array[Byte]. MyBridgeActor accepts the websocket in Flow[Array[Byte],Array[Byte],Any] format.


Solution

  • Using akka stream api, you can convert the flow like:

    import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
    import akka.stream.Materializer
    import akka.stream.scaladsl.{Flow, Sink}
    import akka.util.ByteString
    
    import scala.concurrent.Future
    
    handleWebSocketMessages(msgFlow)
    
    def msgFlow: Flow[Message, Message, Any] = convertFlow(bytesFlow)
    
    def bytesFlow: Flow[Array[Byte], Array[Byte], Any] = {
      // Can just copy ActorFlow over, no need to customize
      ActorFlow.actorRef[Array[Byte], Array[Byte]](...)
    }
    
    def covertFlow(msgs: Flow[Array[Byte], Array[Byte], Any])(implicit materializer: Materializer): Flow[Message, Message, Any] =
      Flow[Message]
        .mapAsync(2)(msgToBytes)
        .via(msgs)
        .map(bytesToMsg)
    
    
    def bytesToMsg(bytes: Array[Byte]): Message = {
      // This depends on your application:
      //   is the outgoing message text or binary?
      val isText = true
      if (isText) {
        TextMessage(new String(bytes, "UTF-8"))
      } else {
        BinaryMessage(ByteString(bytes))
      }
    }
    
    def msgToBytes(msg: Message)(implicit materializer: Materializer): Future[Array[Byte]] = {
      msg match {
        case TextMessage.Strict(data) =>
          Future.successful(data.getBytes("UTF-8"))
        case TextMessage.Streamed(stream) =>
          stream.fold("")(_ + _).map(_.getBytes("UTF-8")).runWith(Sink.head)
        case BinaryMessage.Strict(data) =>
          Future.successful(data.toArray[Byte])
        case BinaryMessage.Streamed(stream) =>
          stream.fold(ByteString.empty)(_ ++ _).map(_.toArray[Byte]).runWith(Sink.head)
      }
    }