I want to do some server-side events (SSE) to a web app. I think I have all the SSE plumbing up and going. I now need to create a Source on the Akka HTTP side of the house.
I found you can do something like this:
val source = Source.actorRef(5, akka.stream.OverflowStrategy.dropTail)
What I want to do is somehow "publish" to this source, presumably by sending an actor a message. I see from the docs that this call creates Source<T,ActorRef>
.
How can I get this ActorRef instance so I can send messages to it?
To obtain the materialized ActorRef
from Source.actorRef
, the stream has to be running. For example, let's say that you want to send the SSE payload data (in the form of a String
) to this actor, which converts that data to ServerSentEvent
objects to send to the client. You could do something like:
val (actor, sseSource) =
Source.actorRef[String](5, akka.stream.OverflowStrategy.dropTail)
.map(s => /* convert String to ServerSideEvent */)
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
.run()
// (ActorRef, Source[ServerSentEvent, NotUsed])
Now you can send messages to the materialized actor:
actor ! "quesadilla"
And use sseSource
in your route:
path("events") {
get {
complete(sseSource)
}
}
Note that there is no backpressure with this approach (i.e., messages to the actor are fired-and-forgotten).