Search code examples
scalaakkaakka-stream

How to stop runnable graph


Getting my first steps with akka streams. I have a graph similar to this one copied from here :

val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)    
val g =    RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
      (topHS, bottomHS) =>
      import GraphDSL.Implicits._
      val broadcast = builder.add(Broadcast[Int](2))
      Source.single(1) ~> broadcast.in       

  broadcast.out(0) ~> sharedDoubler ~> topHS.in
  broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
  ClosedShape
})

I can run the graph using g.run() but how can I stop it ? in what circumstances should I do it (other than the no usage - business wise) ? This graph is contained within an actor. if the Actor crashes what will happen with the graphs underlying actor ? will it terminate as well ?


Solution

  • As described in the documentation, the way to complete a graph from outside the graph is with KillSwitch. The example that you copied from the documentation is not a good candidate to illustrate this approach, as the source is only a single element, and the stream will complete very quickly when you run it. Let's adjust the graph to more easily see the KillSwitch in action:

    val topSink = Sink.foreach(println)
    val bottomSink = Sink.foreach(println)
    val sharedDoubler = Flow[Int].map(_ * 2)
    val killSwitch = KillSwitches.single[Int]
    
    val g = RunnableGraph.fromGraph(GraphDSL.create(topSink, bottomSink, killSwitch)((_, _, _)) {
      implicit builder => (topS, bottomS, switch) =>
    
      import GraphDSL.Implicits._
    
      val broadcast = builder.add(Broadcast[Int](2))
      Source.fromIterator(() => (1 to 1000000).iterator) ~> switch ~> broadcast.in
    
      broadcast.out(0) ~> sharedDoubler ~> topS.in
      broadcast.out(1) ~> sharedDoubler ~> bottomS.in
      ClosedShape
    })
    
    val res = g.run // res is of type (Future[Done], Future[Done], UniqueKillSwitch)
    Thread.sleep(1000)
    res._3.shutdown()
    

    The source now consists of one million elements, and the sinks now print the broadcasted elements. The stream runs for one second, which is not enough time to churn through all one million elements, before we call shutdown to complete the stream.

    If you run a stream inside an actor, whether the lifecycle of the underlying actor (or actors) that is created to run the stream is ntied to the lifecycle of the "enclosing" actor depends on how the materializer is created. Read the documentation for more information. The following blog post by Colin Breck about using an actor and KillSwitch to manage the lifecycle of a stream is helpful as well: http://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/