Search code examples
scalaakka-streambackpressure

Akka-streams backpressure on broadcast with async processing


I am struggling with understanding if akka-stream enforces backpressure on Source when having a broadcast with one branch taking a lot of time (asynchronous) in the graph.

I tried buffer and batch to see if there was any backpressure applied on the source but it does not look like it. I also tried flushing System.out but it does not change anything.

object Test extends App {
/* Necessary for akka stream */
implicit val system = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val in = Source.tick(0 seconds, 1 seconds, 1)
        in.runForeach(i => println("Produced " + i))

    val out = Sink.foreach(println)
    val out2 = Sink.foreach[Int]{ o => println(s"2 $o") }

    val bcast = builder.add(Broadcast[Int](2))

    val batchedIn: Source[Int, Cancellable] = in.batch(4, identity) {
        case (s, v) => println(s"Batched ${s+v}"); s + v
    }

    val f2 = Flow[Int].map(_ + 10)
    val f4 = Flow[Int].map { i => Thread.sleep(2000); i}

    batchedIn ~> bcast ~> f2 ~> out
                 bcast ~> f4.async ~> out2
    ClosedShape
})

g.run()
}

I would expect to see "Batched ..." in the console when I am running the program and at some point to have it momentarily stuck because f4 is not fast enough to process the values. At the moment, none of those behave as expected as the numbers are generated continuously and no batch is done.

EDIT: I noticed that after some time, the batch messages start to print out in the console. I still don't know why it does not happen sooner as the backpressure should happen for the first elements


Solution

  • The reason that explains this behavior are internal buffers that are introduced by akka when async boundaries are set.

    Buffers for asynchronous operators

    internal buffers that are introduced as an optimization when using asynchronous operators.


    While pipelining in general increases throughput, in practice there is a cost of passing an element through the asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses a windowed, batching backpressure strategy internally. It is windowed because as opposed to a Stop-And-Wait protocol multiple elements might be “in-flight” concurrently with requests for elements. It is also batching because a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements are requested after multiple elements have been drained. This batching strategy reduces the communication cost of propagating the backpressure signal through the asynchronous boundary.

    I understand that this is a toy stream, but if you explain what is your goal I will try to help you.

    You need mapAsync instead of async

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import akka.stream.scaladsl.GraphDSL.Implicits._
    
      val in = Source.tick(0 seconds, 1 seconds, 1).map(x => {println(s"Produced ${x}"); x})
    
      val out = Sink.foreach[Int]{ o => println(s"F2 processed $o") }
      val out2 = Sink.foreach[Int]{ o => println(s"F4 processed $o") }
    
      val bcast = builder.add(Broadcast[Int](2))
    
      val batchedIn: Source[Int, Cancellable] = in.buffer(4,OverflowStrategy.backpressure)
    
      val f2 = Flow[Int].map(_ + 10)
      val f4 = Flow[Int].mapAsync(1) { i => Future { println("F4 Started Processing"); Thread.sleep(2000); i }(system.dispatcher) }
    
      batchedIn ~> bcast ~> f2 ~> out
      bcast ~> f4 ~> out2
      ClosedShape
    }).run()