I am reading akka-stream documentation about KillSwitch and they have an example to illustrate the KillSwitch.shutdown
method:
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
.viaMat(KillSwitches.single)(Keep.right)
.toMat(lastSnk)(Keep.both)
.run()
doSomethingElse()
killSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2
I have problems with understanding why expected result should be 2.
As I see this example, stream is put to 1 second delay. While it is paused the shutdown()
gets invoked so the kill switch tells the stream to shutdown before the delay is completed. I don't see why first 2 elements of the stream are expected to be emitted and delivered to the sink.
Could you help and explain?
Note: if I run this example I get as I expected following exception:
Exception in thread "main" java.util.NoSuchElementException: last of empty stream
at akka.stream.scaladsl.Sink$.$anonfun$last$3(Sink.scala:181)
There is a misunderstanding of the sample code. The result purely depends on the runtime duration of doSomethingElse
. Only if it takes too less time you get the exception. To test this you can replace it with Thread.sleep(2000)
and you will get a result back from the Sink
. In case you increase the sleep value the result also increases.
Regarding your question in the comment:
delay
shifts elements emission in time by a specified amount. Delay precision is 10ms to avoid unnecessary timer scheduling cycles. That is the reason you see this behaviour (you can check those details in the Scala docs of Flow
).
In case you want to send one message per second, try throttle
instead:
.throttle(1, 1.second, 1, ThrottleMode.shaping)