Search code examples
akkaakka-stream

How can I get the ActorRef from Source.actorRef()?


I want to do some server-side events (SSE) to a web app. I think I have all the SSE plumbing up and going. I now need to create a Source on the Akka HTTP side of the house.

I found you can do something like this:

val source = Source.actorRef(5, akka.stream.OverflowStrategy.dropTail)

What I want to do is somehow "publish" to this source, presumably by sending an actor a message. I see from the docs that this call creates Source<T,ActorRef>.

How can I get this ActorRef instance so I can send messages to it?


Solution

  • To obtain the materialized ActorRef from Source.actorRef, the stream has to be running. For example, let's say that you want to send the SSE payload data (in the form of a String) to this actor, which converts that data to ServerSentEvent objects to send to the client. You could do something like:

    val (actor, sseSource) =
      Source.actorRef[String](5, akka.stream.OverflowStrategy.dropTail)
            .map(s => /* convert String to ServerSideEvent */)
            .keepAlive(1.second, () => ServerSentEvent.heartbeat)
            .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
            .run()
    
    // (ActorRef, Source[ServerSentEvent, NotUsed])
    

    Now you can send messages to the materialized actor:

    actor ! "quesadilla"
    

    And use sseSource in your route:

    path("events") {
      get {
        complete(sseSource)
      }
    }
    

    Note that there is no backpressure with this approach (i.e., messages to the actor are fired-and-forgotten).