Is it possible to have some kind of loading bar with Akka streaming? I'm looking for something that may give the advancement state of a source.
Source via loadingBar(expectedElment) via someThingElse to Sink
where expectedElement
is the number of elements that should pass through when "100%" is achieved.
If you're looking to monitor the progress from outside the stream then I recommend using an Agent
. The Agent will store the running count:
import akka.agent.Agent
import akka.stream.scaladsl.Flow
def runningCountFlow[T](agent : Agent[Float]) = Flow[T].map { val =>
agent send (_ + 1.0f)
val
}
You can then monitor the agent outside of the stream:
import scala.concurrent.ExecutionContext.Implicits.global
val expectedCount = 42.toFloat
val countAgent = Agent(0.0f)
type DataType = ???
val stream =
Source via runningCountFlow[DataType](countAgent) via someThingElse runWith Sink
Thread.sleep(5000) //let the stream run for a while
val percentComplete = countAgent.get / expectedCount
println(s"stream is $percentComplete complete")