Search code examples
scalaakkaactorakka-streamakka-http

How to wire input and output when creating flow for web socket?


Assuming I have an actor UserActor that know how to work with incoming messages and how to send new ones, I want to handle web sockets inside Akka-Http, so I creating Flow[Message, Message, NotUsed].

Here we getting new messages as JSON and sending them to UserActor. When source completed I receive SourceDied message:

    val incomingMessages: Sink[Message, NotUsed] =
      Flow[Message]
        .mapAsync(1) {
          case TextMessage.Strict(text)  => Future.successful(text)
          case TextMessage.Streamed(msg) => msg.runFold("")(_ + _)
        }
        .map(decode[IncomingMessage])
        .collect { case Right(msg) => msg }
        .map(_.toMessage)
        .to(Sink.actorRef[ChatMessage](userActor, SourceDied))

Here I register out for my UserActor at which It'll send messages:

    val outgoingMessages: Source[Message, NotUsed] =
      Source
        .actorRef[ChatMessage](20, OverflowStrategy.fail)
        .mapMaterializedValue { outActor =>
          userActor ! Connect(outActor)
          NotUsed
        }
        .map((x: ChatMessage) => OutgoingMessage.fromMessage(x))
        .map((outMsg: OutgoingMessage) => TextMessage(outMsg.asJson.toString))

    Flow.fromSinkAndSource(incomingMessages, outgoingMessages)

However, UserActor is one per user, and each user can have multiple sockets open simultaneously. So I just collecting outs to set inside UserActor and sending info to each of them. It works nice.

But when source sending me terminating message (SourceDied in my case), I don't know to which exactly out this source was assigned — and I can't decide which out I should inform about completion and then delete from my outs set.


Solution

  • One idea is to change your Flow to take a unique identifier for each connection:

    def websocketFlow(connectionId: String): Flow[Message, Message, NotUsed] = {
      val incomingMessages: Sink[Message, NotUsed] =
        ...
        .to(Sink.actorRef[ChatMessage](userActor, SourceDied(connectionId)))
    
      val outgoingMessages: Source[Message, NotUsed] =
        Source
          .actorRef[ChatMessage](20, OverflowStrategy.fail)
          .mapMaterializedValue { outActor =>
            userActor ! Connect(connectionId, outActor)
            NotUsed
          }
          ...
    
      Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
    }
    

    Obviously you would need to adjust the SourceDied and Connect messages to include a connection ID (which in this case could be generated with something like java.util.UUID.randomUUID.toString, for example). Then in UserActor, replace the Set with a Map, the keys for which are the connection IDs. Using a Map will enable you to look up connection actors and remove them as needed.