Search code examples
scalaakka-stream

Load bar with Akka streaming


Is it possible to have some kind of loading bar with Akka streaming? I'm looking for something that may give the advancement state of a source.

Source via loadingBar(expectedElment) via someThingElse to Sink

where expectedElement is the number of elements that should pass through when "100%" is achieved.


Solution

  • If you're looking to monitor the progress from outside the stream then I recommend using an Agent. The Agent will store the running count:

    import akka.agent.Agent
    import akka.stream.scaladsl.Flow
    
    def runningCountFlow[T](agent : Agent[Float]) = Flow[T].map { val =>
      agent send (_ + 1.0f)
      val
    } 
    

    You can then monitor the agent outside of the stream:

    import scala.concurrent.ExecutionContext.Implicits.global
    
    val expectedCount = 42.toFloat
    val countAgent = Agent(0.0f)
    
    type DataType = ???
    
    val stream = 
      Source via runningCountFlow[DataType](countAgent) via someThingElse runWith Sink
    
    Thread.sleep(5000) //let the stream run for a while
    
    val percentComplete = countAgent.get / expectedCount
    
    println(s"stream is $percentComplete complete")