Search code examples
scalaakka-stream

Fold and reduce show non-deterministic behavior when ran in parallel, why?


so I'm trying to count occurrences of items using Akka Streams. Underneath example is a simplified version of what I have. I need two pipelines to work concurrently. For some reason, the printed results aren't correct.

Does anyone know why this happens? Am I missing something important regarding substreams?

/**
 * SIMPLE EXAMPLE
 */
object TestingObject {
  import akka.actor.ActorSystem
  import akka.stream._
  import akka.stream.scaladsl._
  import java.nio.file.Paths
  import akka.util.ByteString
  import counting._
  import graph_components._

  // implicit actor system
  implicit val system:ActorSystem = ActorSystem("Sys")

  def main(args: Array[String]): Unit = {

    val customFlow = Flow.fromGraph(GraphDSL.create() {
      implicit builder =>
        import GraphDSL.Implicits._


        // Components
        val A   = builder.add(Balance[(Int, Int)](2, waitForAllDownstreams = true));
        val B1   = builder.add(mergeCountFold.async);
        val B2   = builder.add(mergeCountFold.async);
        val C   = builder.add(Merge[(Int, Int)](2));
        val D   = builder.add(mergeCountReduce);

        // Graph
        A ~> B1 ~> C ~> D
        A ~> B2 ~> C

        FlowShape(A.in, D.out);
    })

    // Run
    Source(0 to 101)
      .groupBy(10, x => x % 4)
      .map(x => (x % 4, 1))
      .via(customFlow)
      .mergeSubstreams
      .to(Sink.foreach(println)).run();
  }

  def mergeCountReduce = Flow[(Int, Int)].reduce((l, r) => {
    println("REDUCING");
    (l._1, l._2 + r._2)
  })
  def mergeCountFold = Flow[(Int, Int)].fold[(Int,Int)](0,0)((l, r) => {
    println("FOLDING");
    (r._1, l._2 + r._2)
  })

}

Solution

  • Two observations:

    • mergeCountReduce will emit the first key it saw with the sum of the values seen (and will fail the stream if it didn't see any elements)
    • mergeCountFold will emit the last key it saw and the sum of the values seen (and will emit a key and value of zero if it didn't see any elements)

    (in both cases, though the key is always the same)

    Neither of those observations are affected by the async boundary.

    In the context of the preceding Balance operator, though, async introduces an implicit buffer, which prevents the graph it wraps from backpressuring until that buffer is full. Balance sends stream values to the first output which isn't backpressuring, so if the stage after Balance is not dramatically slower than the upstream, Balance may send values only to one output (B1 in this case).

    In that scenario, with reduce, B1 would emit the key and count, while B2 fails, causing the whole stream to fail.

    For fold, in that scenario, B1 would emit the key and count, while B2, not having seen any values would emit (0,0). The merge would emit them in the order they emitted (reasonable to assume a 50/50 chance), so the final fold would then either have the key and the count or zero and the count.