Search code examples
akkaakka-streamakka-http

How can I create Server-side events from an EventStream in Akka?


I tried: EventStream -> Source -> Akka HTTP (SSE)

As I see it, this cannot work, because the source will be materialized by Akka HTTP complete(Source, ...) and to send messages from the EventStream to the materialized Source I need the ActorRef (is there a way to get that ActorRef?)


I've found a solution on GitHub which uses ActorPublisher: https://github.com/calvinlfer/Akka-HTTP-Akka-Streams-Akka-Actors-Integration

But since ActorPublisher is an internal API, I am still hoping for clean solution.


Solution

  • You can use Source.actorRef to create a Source that converts event stream elements to ServerSentEvent instances, and BroadcastHub.sink, in a manner like the following:

    val (sseActor, sseSource) =
      Source.actorRef[EventStreamMessageOrWhatever](10, akka.stream.OverflowStrategy.dropTail)
        .map(s => /* convert event stream elements to ServerSideEvent */)
        .keepAlive(1.second, () => ServerSentEvent.heartbeat)
        .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
        .run()
    

    If there is downstream demand, messages (i.e., event stream elements) sent to the materialized ActorRef are emitted downstream. If there is no downstream demand, the messages are buffered up to a certain number (in this example, the buffer size is 10) with the specified overflow strategy.

    You can then subscribe the materialized actor to the EventStream:

    eventStream.subscribe(sseActor, ...)
    

    And the materialized Source is available for use in your path:

    path("sse") {
      get {
        complete(sseSource)
      }
    }
    

    Note that there is no backpressure with this approach.