Search code examples
scalaplayframeworkwebsocketakkaakka-stream

Akka MergeHub and BroadcastHub via Actor to support multiple clients through websockets


At the moment I use a websocket with

  • 1 client
  • 2 sources on server side

The server pushes out keepAlive messages and also answers requests from the client.

Now I would like to spice things up a bit and add the possibility to handle n clients.

So I had a look at:

https://github.com/playframework/play-scala-chatroom-example

This is basically a n-Inlet ~> n-Outlet so if any of the n clients writes something through their respective websocket, all of them get notified (including itself).

What I need is a bit more sophisticated as the server should

  1. still send keepAlive messages to all the connected clients AND
  2. if one of the clients asks for something/triggers a server-side "event", again all the clients should be notified of this.

So it's basically just one step in between in my abstract way of thinking.

Naive as I am I thought it could maybe be done by:

type AllowedWSMessage = String

val myActor = system.actorOf(Props{new myActor}, "myActor")
val myActorSink = Sink.actorRefWithAck(myActor, "init", "acknowledged", "completed")
import scala.concurrent.duration._

val tickingSource: Source[AllowedWSMessage, Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
  .map(_ => "Staying Alive")

val serverMessageSource = Source
.queue[AllowedWSMessage](10, OverflowStrategy.backpressure)
.mapMaterializedValue { queue => myActor ! InitTunnel(queue)}

val serverSource: Source[AllowedWSMessage, Cancellable] = tickingSource.merge(serverMessageSource)

private val (clientSink, clientSource) =
{
    // Don't log MergeHub$ProducerFailed as error if the client disconnects.
    // recoverWithRetries -1 is essentially "recoverWith"
    val source = MergeHub.source[AllowedWSMessage]
      .log("source")
      .recoverWithRetries(-1, { case _: Exception ⇒ Source.empty})

    val sink: Sink[AllowedWSMessage, Source[AllowedWSMessage, NotUsed]] = BroadcastHub.sink[AllowedWSMessage]
    source.via(serverSource).toMat(sink)(Keep.both).run()
  }

(Note the source.via(serverSource)...)

but of course it's not that easy.

In the end what I want is basically:

(Client -> WebSocket ->) MergeHub ~> myActor ~> BroadcastHub (-> WebSocket -> Client)

Now I wonder, what is an elegant way of doing this? Or are MergeHub and BroadcastHub the wrong tools for that challenge?


Solution

  • You have your server source and sink, you said they already work, so I'm not digging into them.

    val fanIn = MergeHub.source[AllowedWSMessage].to(myActorSink).run()
    val fanOut = serverSource.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()
    
    // Now, somewhere in a (route-)method where you handle the websocket connection
    
    Flow.fromSinkAndSource(fanIn, fanOut)
    

    Easy as that, hope the knot in your head unravels now :)