Search code examples
scalaakkaakka-streamakka-http

Stream paginated API response over WebSocket using Akka


On the serverside, I'm consuming an HTTP API that returns its results in pages. As in, the response contains x amount of results and if there were more than 0 I can call it again to get the next x results. x can be arbitrarily chosen up until the max page size of the API.

Now I want to stream the full set of results efficiently over a WebSocket without overwhelming it (backpressure applied). Initially I built up the entire resultset and then created a Source from it:

getEventsFuture().foreach { events =>
  sender ! Flow.fromSinkAndSource(Sink.ignore, Source(events))
}

This works and the WebSocket client receives all events at its max speed. The big downside of this is that I have to fetch all pages before starting to return data to my client. Ideally I'd use a smaller page size and start returning results to the client as soon as they connect, fetching the next pages in the process.

So I need a flow with a source I can add data to after the flow has been materialized. I tried using Source.actorRef for this:

val events = Source.actorRef[Event](1000, OverflowStrategy.fail).mapMaterializedValue { outActor =>
  sendEvents(outActor)
  NotUsed
}

sender ! Flow.fromSinkAndSource(Sink.ignore, events)

Essentially I take the materialized actorRef and send all events to it. Every time a page is fetched, I dump the results to the actor. Now, my initialization of the Source probably already tells you this doesn't work all the time. Sometimes, when the response is large enough and the client doesn't consume as rapidly as other times, the socket connection gets closed. I feel OverflowStrategy.fail is the correct strategy opposed to dropping events because I don't want the client to think they got everything if that wasn't the case.

I have no sane value to set for the buffer up front and I don't want to set Int.max or something because I think Akka internally does allocate the full memory for the buffer size.

How could I solve this? I want all events to the client as fast as possible and with proper backpressure like in the first example.

Once the first page is fetched I do know how many results there will be in total, so I could fetch a small page up front and set the buffer size to the full result size but that seems like a workaround.


Solution

  • I found unfoldAsync to be perfect for this use case.

    Signature

    def unfoldAsync[S, E](s: S)(f: S ⇒ Future[Option[(S, E)]]): Source[E, NotUsed]
    

    Description

    Just like unfold but the fold function returns a Future which will cause the source to complete or emit when it completes.

    emits when there is demand and unfold state returned future completes with some value

    completes when the future returned by the unfold function completes with an empty value