Search code examples
scalawebsocketakkaworkflowakka-stream

Akka-http approach to handle websocket commands


Let's say I have controller that handles commands that I receive from websocket.

class WebSocketController(implicit M: Materializer)
  extends Controller
    with CirceSupport {

  override def route: Route = path("ws") {
    handleWebSocketMessages(wsFlow)
  }

  def wsFlow: Flow[Message, Message, Any] =
    Flow[Message].mapConcat {
      case tm: TextMessage =>
        decode[Command](tm.getStrictText) match {
          // todo pass this command to some actor or service 
          // and get response and reply back.  
          case Right(AuthorizeUser(login, password)) =>
            TextMessage(s"Authorized: $login, $password") :: Nil
          case _ =>
            Nil
        }

      case bm: BinaryMessage =>
        bm.dataStream.runWith(Sink.ignore)
        Nil
    }

}

So, I get a command, deserialize it and next step I want to do is to pass it to some service or actor that will return me Future[SomeReply].

The question is: What is the basic approach of handling such flow with akka streams?


Solution

  • When handling Futures inside a Flow, mapAsync is usually what you're looking for. To add to your specific example:

    def asyncOp(msg: TextMessage): Future[SomeReply] = ???
    def tailorResponse(r: SomeReply): TextMessage = ???
    
    def wsFlow: Flow[Message, Message, Any] =
      Flow[Message]
       .mapConcat {...}
       .mapAsync(parallelism = 4)(asyncOp)
       .via(tailorResponse)
    

    mapAsyncUnordered can also be used, in case the order of the Futures result is not relevant. Parallelism indicates how many Futures can be run at the same time, before the stage backpressures.

    See also

    • stage docs
    • how to use in conjunction with ask - here