Search code examples
scalaakka-stream

Akka Streams stop stream after process n elements


I have Akka Stream flow which is reading from file using alpakka, processing data and write into a file. I want to stop flow after processed n elements, count the time of duration and call system terminate. How can I achieve it?

My flow looks like that:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._

 sourceFile ~> mainFlow ~> sinkFile

ClosedShape
})

graph.run()

Do you have an idea? Thanks


Solution

  • Agreeing with what @Viktor said, first of all you don't need to use the graphDSL to achieve this, and you can use take(n) to complete the graph.

    Secondly, you can use mapMaterializedValue to attach a callback to your Sink materialized value (which in turn should materializes to a Future[Something]).

      val graph: RunnableGraph[Future[FiniteDuration]] =
        sourceFile
          .via(mainFlow)
          .take(N)
          .toMat(sinkFile)(Keep.right)
          .mapMaterializedValue { f ⇒
            val start = System.nanoTime()
            f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
          }
    
      graph.run().onComplete { duration ⇒
        println(s"Elapsed time: $duration")
      }
    

    Note that you are going to need an ExecutionContext in scope.

    EDIT

    Even if you have to use the graphDSL, the same concepts apply. You only need to expose the materialized Future of your sink and map on that.

      val graph: RunnableGraph[Future[??Something??]] = 
        RunnableGraph.fromGraph(GraphDSL.create(sinkFile) { implicit builder: GraphDSL.Builder[Future[Something]] => snk =>
        import GraphDSL.Implicits._
    
        sourceFile ~> mainFlow ~> snk
    
        ClosedShape
      })
    
      val timedGraph: RunnableGraph[Future[FiniteDuration]] = 
        graph.mapMaterializedValue { f ⇒
          val start = System.nanoTime()
          f.map(_ ⇒ FiniteDuration(System.nanoTime() - start, TimeUnit.NANOSECONDS))
        }
    
      timedGraph.run().onComplete { duration ⇒
        println(s"Elapsed time: $duration")
      }