Search code examples
scalaakka-stream

Don't completely understand Source.delay method or there is error in akka-stream docs


I am reading akka-stream documentation about KillSwitch and they have an example to illustrate the KillSwitch.shutdown method:

val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2

I have problems with understanding why expected result should be 2. As I see this example, stream is put to 1 second delay. While it is paused the shutdown() gets invoked so the kill switch tells the stream to shutdown before the delay is completed. I don't see why first 2 elements of the stream are expected to be emitted and delivered to the sink.

Could you help and explain?

Note: if I run this example I get as I expected following exception:

Exception in thread "main" java.util.NoSuchElementException: last of empty stream
    at akka.stream.scaladsl.Sink$.$anonfun$last$3(Sink.scala:181)

Solution

  • There is a misunderstanding of the sample code. The result purely depends on the runtime duration of doSomethingElse. Only if it takes too less time you get the exception. To test this you can replace it with Thread.sleep(2000) and you will get a result back from the Sink. In case you increase the sleep value the result also increases.

    Regarding your question in the comment:

    delay shifts elements emission in time by a specified amount. Delay precision is 10ms to avoid unnecessary timer scheduling cycles. That is the reason you see this behaviour (you can check those details in the Scala docs of Flow).

    In case you want to send one message per second, try throttle instead:

    .throttle(1, 1.second, 1, ThrottleMode.shaping)