I tried: EventStream -> Source -> Akka HTTP (SSE)
As I see it, this cannot work, because the source will be materialized by Akka HTTP complete(Source, ...)
and to send messages from the EventStream to the materialized Source I need the ActorRef (is there a way to get that ActorRef?)
I've found a solution on GitHub which uses ActorPublisher: https://github.com/calvinlfer/Akka-HTTP-Akka-Streams-Akka-Actors-Integration
But since ActorPublisher is an internal API, I am still hoping for clean solution.
You can use Source.actorRef
to create a Source
that converts event stream elements to ServerSentEvent
instances, and BroadcastHub.sink
, in a manner like the following:
val (sseActor, sseSource) =
Source.actorRef[EventStreamMessageOrWhatever](10, akka.stream.OverflowStrategy.dropTail)
.map(s => /* convert event stream elements to ServerSideEvent */)
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
.run()
If there is downstream demand, messages (i.e., event stream elements) sent to the materialized ActorRef
are emitted downstream. If there is no downstream demand, the messages are buffered up to a certain number (in this example, the buffer size is 10) with the specified overflow strategy.
You can then subscribe the materialized actor to the EventStream
:
eventStream.subscribe(sseActor, ...)
And the materialized Source
is available for use in your path:
path("sse") {
get {
complete(sseSource)
}
}
Note that there is no backpressure with this approach.