At the moment I use a websocket with
The server pushes out keepAlive messages and also answers requests from the client.
Now I would like to spice things up a bit and add the possibility to handle n clients.
So I had a look at:
https://github.com/playframework/play-scala-chatroom-example
This is basically a n-Inlet ~> n-Outlet
so if any of the n clients writes something through their respective websocket, all of them get notified (including itself).
What I need is a bit more sophisticated as the server should
So it's basically just one step in between in my abstract way of thinking.
Naive as I am I thought it could maybe be done by:
type AllowedWSMessage = String
val myActor = system.actorOf(Props{new myActor}, "myActor")
val myActorSink = Sink.actorRefWithAck(myActor, "init", "acknowledged", "completed")
import scala.concurrent.duration._
val tickingSource: Source[AllowedWSMessage, Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
.map(_ => "Staying Alive")
val serverMessageSource = Source
.queue[AllowedWSMessage](10, OverflowStrategy.backpressure)
.mapMaterializedValue { queue => myActor ! InitTunnel(queue)}
val serverSource: Source[AllowedWSMessage, Cancellable] = tickingSource.merge(serverMessageSource)
private val (clientSink, clientSource) =
{
// Don't log MergeHub$ProducerFailed as error if the client disconnects.
// recoverWithRetries -1 is essentially "recoverWith"
val source = MergeHub.source[AllowedWSMessage]
.log("source")
.recoverWithRetries(-1, { case _: Exception ⇒ Source.empty})
val sink: Sink[AllowedWSMessage, Source[AllowedWSMessage, NotUsed]] = BroadcastHub.sink[AllowedWSMessage]
source.via(serverSource).toMat(sink)(Keep.both).run()
}
(Note the source.via(serverSource)...
)
but of course it's not that easy.
In the end what I want is basically:
(Client -> WebSocket ->) MergeHub ~> myActor ~> BroadcastHub (-> WebSocket -> Client)
Now I wonder, what is an elegant way of doing this? Or are MergeHub and BroadcastHub the wrong tools for that challenge?
You have your server source and sink, you said they already work, so I'm not digging into them.
val fanIn = MergeHub.source[AllowedWSMessage].to(myActorSink).run()
val fanOut = serverSource.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()
// Now, somewhere in a (route-)method where you handle the websocket connection
Flow.fromSinkAndSource(fanIn, fanOut)
Easy as that, hope the knot in your head unravels now :)