Search code examples
scalaakkaakka-stream

Gracefully stopping an Akka Stream


Given's akka doc I would expect the stream to stop after 7th/8th element. Why is it not stopping ? It continues all the way to the last element (20th).

What I want to achive is that on system terminate, the stream stops requesting new elements and the system to wait termination until all elements in the stream are fully processed (reach the sink)

object StreamKillSwitch extends App {

  implicit val system = ActorSystem(Behaviors.ignore, "sks")
  implicit val ec: ExecutionContext = system.executionContext

  val (killStream, done) =
    Source(1 to 20)
      .viaMat(KillSwitches.single)(Keep.right)
      .map(i => {
        system.log.info(s"Start task $i")
        Thread.sleep(100)
        system.log.info(s"End task $i")
        i
      })
      .toMat(Sink.foreach(println))(Keep.both)
      .run()

  CoordinatedShutdown(system)
    .addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
      () => Future(killStream.shutdown()).map(_ => Done)
    }

  CoordinatedShutdown(system)
    .addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
      () => done
    }

  Thread.sleep(720)

  system.terminate()
  Await.ready(system.whenTerminated, 5.seconds)
}

Solution

  • I was able to "fix" the problem but I'm struggling to explain the behaviour.

    For some reason, your implementation of the map (specifically Thread.sleep) is causing the future/promise callbacks in killStream.shutdown() to not propagate fast enough. My guess is that it fills up the main dispatcher with blocked threads.

    You may increase the amount of source elements Source(1 to 10000) and the Await timeout to see that the kill switch eventually propagates.

    However, adding async boundaries does solve the problem and has the expected results.

    Replace your map with following mapAsync and it works as expected.

    .mapAsync(1) { i =>
      Future {
        system.log.info(s"Start task $i")
        Thread.sleep(100)
        system.log.info(s"Start task $i")
        i
      }
    }
    

    Similar result can be achieved by addition of async marker.

    .map(i => {
      system.log.info(s"Start task $i")
      Thread.sleep(100)
      system.log.info(s"End task $i")
      i
    })
    .async