I would like to create a graph that loop n times before going to sink. I've just created this sample that fulfill my requirements but doesn't end after going to sink and I really don't understand why. Can someone enlighten me?
Thanks.
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, UniformFanOutShape}
import scala.concurrent.Future
object test {
def main(args: Array[String]) {
val ignore: Sink[Any, Future[Unit]] = Sink.ignore
val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
sink => {
import FlowGraph.Implicits._
val fileSource = Source.single((0, Array[String]()))
val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
val afterMerge = Flow[(Int, Array[String])].map {
e =>
println("after merge")
e
}
val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
val toRetry = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("retry " + (r < 3) + " " + r)
r < 3
}
}.map {
case (r, s) => (r + 1, s)
}
val toSink = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("sink " + (r >= 3) + " " + r)
r >= 3
}
}
merge.preferred <~ toRetry <~ broadcastArray
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
}
}
implicit val system = ActorSystem()
implicit val _ = ActorMaterializer()
val run: Future[Unit] = closed.run()
import system.dispatcher
run.onComplete {
case _ => {
println("finished")
system.shutdown()
}
}
}
}`
The Stream is never completed because the merge never signals completion.
After formatting your graph structure, it basically looks like:
//ignoring the preferred which is inconsequential
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
merge <~ toRetry <~ broadcastArray
The problem of non-completion is rooted in your merge step :
// 2 inputs into merge
fileSource ~> merge
merge <~ toRetry
Once the fileSource has emitted its single element (namely (0, Array.empty[String])
) it sends out a complete
message to merge.
However, the fileSource's completion message gets blocked at the merge. From the documentation:
akka.stream.scaladsl.MergePreferred
Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
The merge will not send out complete
until all of its input streams have completed.
// fileSource is complete ~> merge
// merge <~ toRetry is still running
// complete fileSource + still running toRetry = still running merge
Therefore, merge will wait until toRetry
also completes. But toRetry
will never complete because it is waiting for merge
to complete.
If you want your specific graph to complete after fileSource completes then just set eagerClose=True
which will cause merge to complete once fileSource completes. E.g.:
//Add this true |
// V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")
A simpler solution exists for your problem. Just use a single Flow.map
stage which utilizes a tail recursive function:
//Note: there is no use of akka in this implementation
type FileInputType = (Int, Array[String])
@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType =
fileInput match {
case (r,_) if r >= 3 => fileInput
case (r,a) => recursiveRetry((r+1, a))
}
Your stream would then be reduced to
//ring-fenced akka code
val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry
fileSource ~> recursiveRetryFlow ~> toSink ~> sink
The result is a cleaner stream & it avoids mixing "business logic" with akka code. This allows unit testing of the retry functionality completely independent from any third party library. The retry loop you have embedded in your stream is the "business logic". Therefore the mixed implementation is tightly coupled to akka going forward, for better or worse.
Also, in the segregated solution the cycle is contained in a tail recursive function, which is idiomatic Scala.