Search code examples
scalaakkaakka-stream

Regarding clean shutdown for actor system and akka stream


We wish to accomplish the following on akka stream shutdown

  1. The source is stopped,
  2. The elements in the stream are completed upto a timeout point
  3. then the stream/actor system is shut down

We have the following code for shutting down our actor system (Akka 2.4).

    try {
      Await.ready(actorSystem.terminate(), sleepSeconds.seconds)
    } catch {
      case ex: Throwable => log.error("Failed to terminate actor system", ex)
    }

I am trying to understand the akka documentation and its not clear to me if we are required to shutdown the materializer (for our akka stream) separately or not? We have a method on the materializer and one on the actor system here.

Or in other words, what is the idiomatic step of operations we should be taking to cleanly/gracefully shutdown our akka stream .


Solution

  • You have to shutdown the stream before actor system termination. If you don't, you may lose some elements in internal buffers that may not get to the final Sink

    Consider following example with akka 2.6 (if some APIs are different from 2.4, please adjust yourself)

    implicit val actorSystem: ActorSystem = ActorSystem("test")
    val source = Source(LazyList.fill(100_000)(Random.nextInt(100)))
    val flow = Flow[Int]
      .throttle(3, 1.second)
      .map { s =>
        println(s);
        s
      }
      .grouped(4)
      .map(s => (s.size, s.max))
    val stream = source.via(flow).to(Sink.foreach(println))
    val _ = stream.run()
    Thread.sleep(5_400)
    val _ = Await.result(actorSystem.terminate(), 2.seconds)
    println("Terminated")
    

    when it runs, you may see final output before actor system termination as following

    36
    84
    38
    32
    (4,84)
    29
    Terminated
    

    it produces element 29 that is stuck in grouped and never reaches final Sink.foreach(println).

    This is possible to overcome with the help of a kill switch

    implicit val actorSystem: ActorSystem = ActorSystem("test")
    val killSwitch = KillSwitches.single[Int]
    val source = Source(LazyList.fill(100_000)(Random.nextInt(100))).viaMat(killSwitch)(Keep.right)
    val flow = Flow[Int]
      .throttle(3, 1.second)
      .map { s =>
        println(s);
        s
      }
      .grouped(4)
      .map(s => (s.size, s.max))
    val stream = source.viaMat(flow)(Keep.left).toMat(Sink.foreach(println))(Keep.both)
    val (kill, done) = stream.run()
    Thread.sleep(5_400)
    kill.shutdown()
    val _ = Await.result(done, 2.seconds)
    println("Terminated")
    val _ = Await.result(actorSystem.terminate(), 2.seconds)
    

    Now the last output will be correct

    10
    64
    77
    6
    (4,77)
    74
    24
    (2,74)
    Terminated
    

    and output (2,74) proves that grouped was flushed.

    Please note usage of viaMat and toMat to capture two materialized values. (You can read about materialized values here)

    • killSwitch returned by KillSwitches.single[Int]
    • Future[Done] returned by Sink.foreach

    KillSwitch can only initiate a termination of a stream but you still have to wait for the stream completion. This can be done via Future[Done] from the Sink

    kill.shutdown()
    val _ = Await.result(done, 2.seconds)
    

    This initiates a shutdown and then waits for it to complete.

    Where kill switch should be integrated

    Answering question in the comments, the kill switch needs to be integrated as close to the source as possible. The kill switch has to integrated before all of the side effects, in my example these are map and sink because of println. In real world, side effect can be storing a value in a database or sending a message to external queue/topic.

    Kill switch emits a source completion signal and this signal must pass downstream to initiation termination logic for each flow and sink.