Search code examples
scalaakkaakka-stream

monitoring the lifecyle of an akka stream


I want to monitor the lifecyle of an akka-stream, it seems like monitor would do what I need, but my monitoring function is async, returning a Future, so I would need monitor to be async as well.

monitor has the following signature:

def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2]

But I would need something like:

def monitorAsync[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Future[Mat2]): ReprMat[Out, Mat2]

Is there a way to implement this using akka-streams primitives like mapAsync.

I suppose I could use mapAsync + watchTermination but seems like a complicated solution when monitor almost does what I need.


Solution

  • Turns out monitor was not what I wanted at all, as a would only have access to FlowMonitor after the materialization of the stream.

    I ended up implementing this with mapAsync and recover. I am simplifying here, but something like this:

    val monitor = new Monitor {
      def onNext: Future[Unit] = ???
      def onFailure(cause: Throwable): Future[Unit] = ???
      def onFinish: Future[Unit] = ???
    }
    
    source.mapAsync { v => 
      monitor.onNext.map(_ => v)
    }.watchTermination() { (mat, doneF) =>
      doneF.flatMap(_ => monitor.onFinish).recoverWith( case ex => monitor.onFailure(ex))
    }