Search code examples
scalagraphcycleakka-stream

Why Akka streams cycle doesn't end in this graph?


I would like to create a graph that loop n times before going to sink. I've just created this sample that fulfill my requirements but doesn't end after going to sink and I really don't understand why. Can someone enlighten me?

Thanks.

    import akka.actor.ActorSystem
    import akka.stream.scaladsl._
    import akka.stream.{ActorMaterializer, UniformFanOutShape}

    import scala.concurrent.Future

    object test {
      def main(args: Array[String]) {
        val ignore: Sink[Any, Future[Unit]] = Sink.ignore
        val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
          sink => {
            import FlowGraph.Implicits._

            val fileSource = Source.single((0, Array[String]()))
            val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
            val afterMerge = Flow[(Int, Array[String])].map {
              e =>
                println("after merge")
                e
            }
            val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
            val toRetry = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("retry " + (r < 3) + " " + r)
                r < 3
              }
            }.map {
              case (r, s) => (r + 1, s)
            }
            val toSink = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("sink " + (r >= 3) + " " + r)
                r >= 3
              }
            }
            merge.preferred <~ toRetry <~ broadcastArray
            fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
          }
        }
        implicit val system = ActorSystem()
        implicit val _ = ActorMaterializer()
        val run: Future[Unit] = closed.run()
        import system.dispatcher
        run.onComplete {
          case _ => {
            println("finished")
            system.shutdown()
          }
        }
      }
    }`

Solution

  • The Stream is never completed because the merge never signals completion.

    After formatting your graph structure, it basically looks like:

    //ignoring the preferred which is inconsequential
    
    fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
                  merge <~ toRetry    <~ broadcastArray
    

    The problem of non-completion is rooted in your merge step :

    // 2 inputs into merge
    
    fileSource ~> merge 
                  merge <~ toRetry
    

    Once the fileSource has emitted its single element (namely (0, Array.empty[String])) it sends out a complete message to merge.

    However, the fileSource's completion message gets blocked at the merge. From the documentation:

    akka.stream.scaladsl.MergePreferred

    Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)

    The merge will not send out complete until all of its input streams have completed.

    // fileSource is complete ~> merge 
    //                           merge <~ toRetry is still running
    
    // complete fileSource + still running toRetry = still running merge
    

    Therefore, merge will wait until toRetry also completes. But toRetry will never complete because it is waiting for merge to complete.

    If you want your specific graph to complete after fileSource completes then just set eagerClose=True which will cause merge to complete once fileSource completes. E.g.:

    //Add this true                                             |
    //                                                          V
    val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")
    

    Without the Stream Cycle

    A simpler solution exists for your problem. Just use a single Flow.map stage which utilizes a tail recursive function:

    //Note: there is no use of akka in this implementation
    
    type FileInputType = (Int, Array[String])
    
    @scala.annotation.tailrec
    def recursiveRetry(fileInput : FileInputType) : FileInputType = 
      fileInput match { 
        case (r,_) if r >= 3  => fileInput
        case (r,a)            => recursiveRetry((r+1, a))
      }    
    

    Your stream would then be reduced to

    //ring-fenced akka code
    
    val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry
    
    fileSource ~> recursiveRetryFlow ~> toSink ~> sink
    

    The result is a cleaner stream & it avoids mixing "business logic" with akka code. This allows unit testing of the retry functionality completely independent from any third party library. The retry loop you have embedded in your stream is the "business logic". Therefore the mixed implementation is tightly coupled to akka going forward, for better or worse.

    Also, in the segregated solution the cycle is contained in a tail recursive function, which is idiomatic Scala.