I want to monitor the lifecyle of an akka-stream, it seems like monitor would do what I need, but my monitoring function is async, returning a Future
, so I would need monitor to be async as well.
monitor
has the following signature:
def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2]
But I would need something like:
def monitorAsync[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Future[Mat2]): ReprMat[Out, Mat2]
Is there a way to implement this using akka-streams primitives like mapAsync
.
I suppose I could use mapAsync
+ watchTermination
but seems like a complicated solution when monitor
almost does what I need.
Turns out monitor
was not what I wanted at all, as a would only have access to FlowMonitor
after the materialization of the stream.
I ended up implementing this with mapAsync
and recover
. I am simplifying here, but something like this:
val monitor = new Monitor {
def onNext: Future[Unit] = ???
def onFailure(cause: Throwable): Future[Unit] = ???
def onFinish: Future[Unit] = ???
}
source.mapAsync { v =>
monitor.onNext.map(_ => v)
}.watchTermination() { (mat, doneF) =>
doneF.flatMap(_ => monitor.onFinish).recoverWith( case ex => monitor.onFailure(ex))
}