Search code examples
scalaakkaakka-stream

How to signal the Sink when all elements have been processed?


I have a graph that accepts a sequence of files, processes them one by one ant then at the end of the execution, the program should return success (0) or failure (-1) if all the executions have succeeded or failed.
How could this last step be achieved? How could the Sink know when it is receiving the result for the last file?

val graph = createGraph("path-to-list-of-files")
val result = graph.run()

def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = {
  printStage("PREPARING") {
  val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource()
  val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow()
  val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow()
  val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow()
  val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow()
  val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink()

  val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._
    producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter

    ClosedShape
  })
  printLine("The graph pipeline was created")
  graphResult
}

Solution

  • Your reporter Sink already materializes to a Future[Done], which you can hook to if you want to run some code when all your elements have processed.

    However, at the moment you are not exposing it in your graph. Although there is a way to expose it using the graph DSL, in your case it is even easier to use the fluent DSL to achieve this:

      val graphResult: RunnableGraph[Future[Done]] = producer
        .via(validator)
        .via(provisioner)
        .via(executor)
        .via(evaluator)
        .toMat(reporter)(Keep.right)
    

    This will give you back the Future[Done] when you run your graph

    val result: Future[Done] = graph.run()
    

    which then you can hook to - e.g.

    result.onComplete {
      case Success(_) => println("Success!")
      case Failure(_) => println("Failure..")
    }