Search code examples
scalaakkaakka-streamreactive-streams

Akka Reactive Streams always one message behind


For some reason, my Akka streams always wait for a second message before "emitting"(?) the first.

Here is some example code that demonstrates my problem.

val rx = Source((1 to 100).toStream.map { t =>
  Thread.sleep(1000)
  println(s"doing $t")
  t
})
rx.runForeach(println)

yields output:

doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...

What I want:

doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...

Solution

  • The way your code is setup now, you are completely transforming the Source before it's allowed to start emitting elements downstream. You can clearly see that behavior (as @slouc stated) by removing the toStream on the range of numbers that represents the source. If you do that, you will see the Source be completely transformed first before it starts responding to downstream demand. If you actually want to run a Source into a Sink and have a transformation step in the middle, then you can try and structure things like this:

    val transform =
      Flow[Int].map{ t =>
        Thread.sleep(1000)
        println(s"doing $t")
        t
      }
    
    Source((1 to 100).toStream).
      via(transform ).
      to(Sink.foreach(println)).
      run
    

    If you make that change, then you will get the desired effect, which is that an element flowing downstream gets processed all the way through the flow before the next element starts to be processed.