Search code examples
scalaakkaakka-stream

How to log flow rate in Akka Stream?


I have an Akka Stream application with a single flow/graph. I want to measure the flow rate at the source and log it every 5 seconds, like 'received 3 messages in the last 5 seconds'. I tried with,

someOtherFlow
  .groupedWithin(Integer.MAX_VALUE, 5 seconds)
  .runForeach(seq => 
    log.debug(s"received ${seq.length} messages in the last 5 seconds")
  )

but it only outputs when there are messages, no empty list when there are 0 messages. I want the 0's as well. Is this possible?


Solution

  • You could try something like

      src
        .conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 }
        .zip(Source.tick(5.seconds, 5.seconds, NotUsed))
        .map(_._1)
    

    which should batch your elements until the tick releases them. This is inspired from an example in the docs.

    On a different note, if you need this for monitoring purposes, you could leverage a 3rd party tool for this purpose - e.g. Kamon.