Given's akka doc I would expect the stream to stop after 7th/8th element. Why is it not stopping ? It continues all the way to the last element (20th).
What I want to achive is that on system terminate, the stream stops requesting new elements and the system to wait termination until all elements in the stream are fully processed (reach the sink)
object StreamKillSwitch extends App {
implicit val system = ActorSystem(Behaviors.ignore, "sks")
implicit val ec: ExecutionContext = system.executionContext
val (killStream, done) =
Source(1 to 20)
.viaMat(KillSwitches.single)(Keep.right)
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.toMat(Sink.foreach(println))(Keep.both)
.run()
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
() => Future(killStream.shutdown()).map(_ => Done)
}
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
() => done
}
Thread.sleep(720)
system.terminate()
Await.ready(system.whenTerminated, 5.seconds)
}
I was able to "fix" the problem but I'm struggling to explain the behaviour.
For some reason, your implementation of the map
(specifically Thread.sleep
) is causing the future/promise callbacks in killStream.shutdown()
to not propagate fast enough. My guess is that it fills up the main dispatcher with blocked threads.
You may increase the amount of source elements Source(1 to 10000)
and the Await
timeout to see that the kill switch eventually propagates.
However, adding async boundaries does solve the problem and has the expected results.
Replace your map
with following mapAsync
and it works as expected.
.mapAsync(1) { i =>
Future {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"Start task $i")
i
}
}
Similar result can be achieved by addition of async
marker.
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.async