Let's say I have controller that handles commands that I receive from websocket.
class WebSocketController(implicit M: Materializer)
extends Controller
with CirceSupport {
override def route: Route = path("ws") {
handleWebSocketMessages(wsFlow)
}
def wsFlow: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
decode[Command](tm.getStrictText) match {
// todo pass this command to some actor or service
// and get response and reply back.
case Right(AuthorizeUser(login, password)) =>
TextMessage(s"Authorized: $login, $password") :: Nil
case _ =>
Nil
}
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Nil
}
}
So, I get a command, deserialize it and next step I want to do is to pass it to some service or actor that will return me Future[SomeReply]
.
The question is: What is the basic approach of handling such flow with akka streams?
When handling Future
s inside a Flow
, mapAsync
is usually what you're looking for. To add to your specific example:
def asyncOp(msg: TextMessage): Future[SomeReply] = ???
def tailorResponse(r: SomeReply): TextMessage = ???
def wsFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapConcat {...}
.mapAsync(parallelism = 4)(asyncOp)
.via(tailorResponse)
mapAsyncUnordered
can also be used, in case the order of the Future
s result is not relevant.
Parallelism indicates how many Future
s can be run at the same time, before the stage backpressures.
See also