Search code examples
scalaakkaakka-streamakka-http

How to broadcast the received messages to two different flows


How to broadcast the received messages to two different flows

I am using akka stream websocket client to request and receive the data websocket server. With the received data from the websocket, I would like to broadcast into two different flows. The image below, should clarify the scenario:

enter image description here

As you can see on the image, it should be broadcasted to two different flows subsequently to seperate sink.

The websocket client can be created as the following:

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

Solution

  • You can use SinkShape to get the required flow

    Sink.fromGraph(GraphDSL.create(){
      implicit b =>
        val bcast = b.add(Broadcast[Message](2))
        val flow1 = b.add(Flow[Message].map(m => m))
        val flow2 = b.add(Flow[Message].map(m => m ))
        val sink1 = b.add(Sink.foreach(println))
        val sink2 = b.add(Sink.foreach(println))
    
        bcast ~> flow1 ~> sink1
        bcast ~> flow2 ~> sink2
    
        SinkShape(bcast.in)
    })
    

    The entire code is

      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()
      import system.dispatcher
    
      // Future[Done] is the materialized value of Sink.foreach,
      // emitted when the stream completes
    
      val incomingSink = Sink.fromGraph(GraphDSL.create() {
        implicit b =>
          import GraphDSL.Implicits._
          val bcast = b.add(Broadcast[Message](2))
          val flow1 = b.add(Flow[Message].map(m => m))
          val flow2 = b.add(Flow[Message].map(m => m ))
          val sink1 = b.add(Sink.head[Message])
          val sink2 = b.add(Sink.head[Message])
    
          bcast ~> flow1 ~> sink1
          bcast ~> flow2 ~> sink2
    
          SinkShape(bcast.in)
      }).mapMaterializedValue(_ => Future(Done))
      // send this as a message over the WebSocket
      val outgoing = Source.single(TextMessage("hello world!"))
    
      // flow to use (note: not re-usable!)
      val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
    
      // the materialized value is a tuple with
      // upgradeResponse is a Future[WebSocketUpgradeResponse] that
      // completes or fails when the connection succeeds or fails
      // and closed is a Future[Done] with the stream completion from the incoming sink
      val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incomingSink)(Keep.both) // also keep the Future[Done]
        .run()
    
      // just like a regular http request we can access response status which is available via upgrade.response.status
      // status code 101 (Switching Protocols) indicates that server support WebSockets
      val connected = upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
          Future.successful(Done)
        } else {
          throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
      }
    
      // in a real application you would not side effect here
      connected.onComplete(println)
      closed.foreach(_ => println("closed"))