Assuming I have an actor UserActor
that know how to work with incoming messages and how to send new ones, I want to handle web sockets inside Akka-Http, so I creating Flow[Message, Message, NotUsed]
Here we getting new messages as JSON and sending them to UserActor
. When source completed I receive SourceDied
val incomingMessages: Sink[Message, NotUsed] =
.mapAsync(1) {
case TextMessage.Strict(text) => Future.successful(text)
case TextMessage.Streamed(msg) => msg.runFold("")(_ + _)
.collect { case Right(msg) => msg }
.to(Sink.actorRef[ChatMessage](userActor, SourceDied))
Here I register out
for my UserActor
at which It'll send messages:
val outgoingMessages: Source[Message, NotUsed] =
.mapMaterializedValue { outActor =>
userActor ! Connect(outActor)
.map((x: ChatMessage) => OutgoingMessage.fromMessage(x))
.map((outMsg: OutgoingMessage) => TextMessage(outMsg.asJson.toString))
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
However, UserActor is one per user, and each user can have multiple sockets open simultaneously. So I just collecting outs
to set inside UserActor
and sending info to each of them. It works nice.
But when source sending me terminating message (SourceDied
in my case), I don't know to which exactly out
this source
was assigned — and I can't decide which out
I should inform about completion and then delete from my outs
One idea is to change your Flow
to take a unique identifier for each connection:
def websocketFlow(connectionId: String): Flow[Message, Message, NotUsed] = {
val incomingMessages: Sink[Message, NotUsed] =
.to(Sink.actorRef[ChatMessage](userActor, SourceDied(connectionId)))
val outgoingMessages: Source[Message, NotUsed] =
.mapMaterializedValue { outActor =>
userActor ! Connect(connectionId, outActor)
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
Obviously you would need to adjust the SourceDied
and Connect
messages to include a connection ID (which in this case could be generated with something like java.util.UUID.randomUUID.toString
, for example). Then in UserActor
, replace the Set
with a Map
, the keys for which are the connection IDs. Using a Map
will enable you to look up connection actors and remove them as needed.