Search code examples
scalawebsocketakka-streamakka-http

Akka-Http : Working with websockets - Patterns for extending the Flow


The akka http websocket works really well. It takes in a Flow[Message, Message, Future[Done]

We could create a Source val src = Source.maybe and Sink val snk = Sink.foreach(...) of the same shape and call Flow.fromSinkAndSourceMat(snk, src). src is used to send data to websocket(source of the flow) and snk is to receive the data.

There will be scenarios where we would want to extend the flow we pass, ie - we would want to receive Message, send it to another Flow for JSON parsing or data validation then finally to a sink.

How do I construct the Flow that I pass in Http().singleWebSocketRequest(WebSocketRequest("ws://someip:port"), flow) to include various other flow stages and then a final sink. I can set my source to be Source.maybe as I only care about incoming.


Solution

  • You can compose your Sink as a series of Flows, and a final Sink.

    val flow1: Flow[Message, Message, NotUsed] = ???
    val flow2: Flow[Message, String, NotUsed] = ???
    val sink: Sink[String, Future[Done]] = Sink.foreach[String](println) 
    
    val megaSink = flow1.via(flow2).to(sink)
    

    Your composed Sink can then be used to put together your WS flow, similarly to what you already have:

    Flow.fromSinkAndSource(megaSink, Source.maybe)