Search code examples
scalaakkaakka-streamakka-httpreactive-streams

Complete request with latest item in a flow


I'd like to complete a GET request with the latest available item in a flow. This flow in particular aggregates events produced by an actor and already individually consumed by a WebSocket.

Let's say that the event can be represented as follows:

final case class Event(id: String, value: Double)

The first thing I do is creating a SourceQueue where the actor will push the events and a hub so that different clients can receive these events independently:

val (queue, hub) =
  Source.queue[Event](256, OverflowStrategy.dropHead).
    toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both).run()

I'm then able to create an actor which can push events to queue and pass hub to a service which serves the events via a WebSocket:

extractUpgradeToWebSocket { upgrade =>
  complete(upgrade.handleMessagesWithSinkSource(
    inSink = Sink.ignore,
    outSource =
      hub.map(in => TextMessage(fmt.write(in).toString()))
  ))
}

This works fine, also with multiple consumers at the same time.

What I would like to do next is have a service which consumes the events from hub and produces a list of the latest event per id, serving it via a GET endpoint.

I tried several approaches to solve this one. The two approaches I attempted were:

  • run a flow that updates a private variable
  • complete with a sink that returns the last element

Run a flow that updates a private variable

This is the last approach I tried, actually. The weird (or is it?) think I noticed is that actually nothing gets logged (shouldn't thing that go through the log combinator be logged?).

The result using this approach is that latest is alway null and the response is thus always empty.

final class Service(hub: Source[Event, NotUsed])(implicit s: ActorSystem, m: ActorMaterializer, t: Timeout) extends Directives with JsonSupport {

  implicit private val executor = system.dispatcher

  @volatile private[this] var latest: List[Event] = _

  hub.
    log("hub", identity).
    groupBy(Int.MaxValue, { case Event(id, _) => id }).
    map { case event @ Event(id, _) => Map(id -> event) }.
    reduce(_ ++ _).
    mergeSubstreams.
    map(_.values.toList).
    toMat(Sink.foreach(latest = _))(Keep.none).run()

  val definition = get { complete(Option(latest)) }

}

I also tried a similar approach that uses a "box" actor and pipes the aggregates to it, but the effect is the same.

Complete with a sink that returns the last element

This is the first approach I tried to take. The effect is that the response hangs until a timeout is reached and Akka HTTP returns a 500 to the browser.

final class Service(hub: Source[Event, NotUsed])(implicit s: ActorSystem, m: ActorMaterializer, t: Timeout) extends Directives with JsonSupport {

  implicit private val executor = system.dispatcher
  private[this] val currentLocations =
    hub.
      groupBy(Int.MaxValue, { case Event(id, _) => id }).
      map { case event @ Event(id, _) => Map(id -> event) }.
      reduce(_ ++ _).
      mergeSubstreams.
      map(_.values.toList).
      runWith(Sink.reduce((_, next) => next))

  val definition = get { complete(currentLocations) }

}

Solution

  • ActorRef As A Sink

    You can create an Actor that keeps a running Map of id to Event:

    import scala.collection.immutable
    
    object QueryMap
    
    class MapKeeperActor() extends Actor {
    
      var internalMap = immutable.Map.empty[String, Event]
    
      override def receive = {
        case e : Event    => internalMap = internalMap + (e.id -> e)
        case _ : QueryMap => sender ! internalMap
      }
    }
    

    This ref can then be used within a Sink which will be attached to the BroadcastHub:

    object OnCompleteMessage
    
    val system : ActorSystem = ???
    
    val mapKeeperRef = system.actorOf(Props[MapKeeperActor])
    
    val mapKeeperSink : Sink[Event, _] = Sink.actorRef[Event](mapKeeperRef, OnCompleteMessage)
    

    Query Actor in Route

    We can now create a Route which will query the map keeper using Directives. However, you will have to decide how to serialize the Map into a ResponseEntity for the HttpResponse:

    val serializeMap : Map[String, Event] => ResponseEntity = ???
    
    val route = 
      get {
        onComplete( (mapKeeperRef ? QueryMap).mapTo[Map[String, Event]]) {
          case Success(map) => complete(HttpResponse(entity=serializeMap(map))
          case Failure(ex)  => complete((InternalServerError, s"An error occurred: ${ex.getMessage}"))
        }
      }