Search code examples
scalawebsocketakka-stream

Akka Streams WebSocket to send info on arbitrary events


I want to implement a service where a number of clients can connect to a server using a WebSocket. The server should be able to broadcast messages to all the connected clients on arbitrary internal events. So far I have this code:

import akka.http.scaladsl.server.RouteResult.route2HandlerFlow
import akka.http.scaladsl.server.Directives._
implicit val system = ActorSystem("Server")
implicit val mat = ActorMaterializer()

// The source to broadcast (just ints for simplicity)
val dataSource = Source(1 to 1000).throttle(1, 1.second, 1, ThrottleMode.Shaping).map(_.toString)

// Go via BroadcastHub to allow multiple clients to connect
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
  dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)

val producer: Source[String, NotUsed] = runnableGraph.run()

// Optional - add sink to avoid backpressuring the original flow when no clients are attached
producer.runWith(Sink.ignore)

val wsHandler: Flow[Message, Message, NotUsed] =
  Flow[Message]
    .mapConcat(_ => Nil) // Ignore any data sent from the client
    .merge(producer)  // Stream the data we want to the client
    .map(l => TextMessage(l.toString))

val route =
  path("ws") {
    handleWebSocketMessages(wsHandler)
  }

val port = 8080

println("Starting up route")
Http().bindAndHandle(route2HandlerFlow(route), "127.0.0.1", port)
println(s"Started HTTP server on port $port")

It successfully broadcasts current ticks to the connected clients. How should I modify this code to be able to also broadcast arbitrary messages, not just scheduled ticks?

Clarification:

By "arbitrary messages" I don't mean other sources like a file or a database, but rather an ability to send a message to a specialized Source and get it relayed to the currently connected clients. Such a message may be a result of some internal system event which can happen at any time.


Solution

  • One idea is to use Source.actorRef:

    val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
      .toMat(BroadcastHub.sink[String])(Keep.both)
      .run()
    
    val wsHandler: Flow[Message, Message, NotUsed] = Flow[Message]
      .mapConcat(_ => Nil)
      .merge(source)
      .map(l => TextMessage(l.toString))
    

    Messages sent to the materialized ActorRef are emitted if there is downstream demand. If there is no downstream demand, the elements are buffered, and the provided overflow strategy is used if the buffer is full. Note that there is no backpressure with this approach. You can send messages from a Source, as well as arbitrary messages, to this actor:

    Source(1 to 1000)
      .throttle(1, 1.second, 1, ThrottleMode.Shaping)
      .map(_.toString)
      .runForeach(msg => actor ! msg)
    
    actor ! "bacon"
    actor ! "ribeye"