I'd like to complete a GET
request with the latest available item in a flow. This flow in particular aggregates events produced by an actor and already individually consumed by a WebSocket.
Let's say that the event can be represented as follows:
final case class Event(id: String, value: Double)
The first thing I do is creating a SourceQueue
where the actor will push the events and a hub so that different clients can receive these events independently:
val (queue, hub) =
Source.queue[Event](256, OverflowStrategy.dropHead).
toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both).run()
I'm then able to create an actor which can push events to queue
and pass hub
to a service which serves the events via a WebSocket:
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(
inSink = Sink.ignore,
outSource =
hub.map(in => TextMessage(fmt.write(in).toString()))
))
}
This works fine, also with multiple consumers at the same time.
What I would like to do next is have a service which consumes the events from hub
and produces a list of the latest event per id, serving it via a GET
endpoint.
I tried several approaches to solve this one. The two approaches I attempted were:
last
elementThis is the last approach I tried, actually. The weird (or is it?) think I noticed is that actually nothing gets logged (shouldn't thing that go through the log
combinator be logged?).
The result using this approach is that latest
is alway null
and the response is thus always empty.
final class Service(hub: Source[Event, NotUsed])(implicit s: ActorSystem, m: ActorMaterializer, t: Timeout) extends Directives with JsonSupport {
implicit private val executor = system.dispatcher
@volatile private[this] var latest: List[Event] = _
hub.
log("hub", identity).
groupBy(Int.MaxValue, { case Event(id, _) => id }).
map { case event @ Event(id, _) => Map(id -> event) }.
reduce(_ ++ _).
mergeSubstreams.
map(_.values.toList).
toMat(Sink.foreach(latest = _))(Keep.none).run()
val definition = get { complete(Option(latest)) }
}
I also tried a similar approach that uses a "box" actor and pipes the aggregates to it, but the effect is the same.
last
elementThis is the first approach I tried to take. The effect is that the response hangs until a timeout is reached and Akka HTTP returns a 500 to the browser.
final class Service(hub: Source[Event, NotUsed])(implicit s: ActorSystem, m: ActorMaterializer, t: Timeout) extends Directives with JsonSupport {
implicit private val executor = system.dispatcher
private[this] val currentLocations =
hub.
groupBy(Int.MaxValue, { case Event(id, _) => id }).
map { case event @ Event(id, _) => Map(id -> event) }.
reduce(_ ++ _).
mergeSubstreams.
map(_.values.toList).
runWith(Sink.reduce((_, next) => next))
val definition = get { complete(currentLocations) }
}
ActorRef
As A Sink
You can create an Actor
that keeps a running Map
of id
to Event
:
import scala.collection.immutable
object QueryMap
class MapKeeperActor() extends Actor {
var internalMap = immutable.Map.empty[String, Event]
override def receive = {
case e : Event => internalMap = internalMap + (e.id -> e)
case _ : QueryMap => sender ! internalMap
}
}
This ref can then be used within a Sink
which will be attached to the BroadcastHub
:
object OnCompleteMessage
val system : ActorSystem = ???
val mapKeeperRef = system.actorOf(Props[MapKeeperActor])
val mapKeeperSink : Sink[Event, _] = Sink.actorRef[Event](mapKeeperRef, OnCompleteMessage)
Query Actor in Route
We can now create a Route
which will query the map keeper using Directives. However, you will have to decide how to serialize the Map
into a ResponseEntity
for the HttpResponse
:
val serializeMap : Map[String, Event] => ResponseEntity = ???
val route =
get {
onComplete( (mapKeeperRef ? QueryMap).mapTo[Map[String, Event]]) {
case Success(map) => complete(HttpResponse(entity=serializeMap(map))
case Failure(ex) => complete((InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}