Search code examples
multithreadingscalaakkaakka-stream

Akka Stream Graph parallelisation


I've created a Graph whcih contains a Balance. This Balance distributes the load over 5 Flows. What I expected what would happen was that every instance of my Flow would run on a seperate Thread. However, this is not what happens. When I'm printing the Thread name I notice that all Flows are being executed on the same Thread.

The code I'm using is:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(1 to 10)
  val out = Sink.ignore

  val bal = builder.add(Balance[Int](5))
  val merge = builder.add(Merge[Int](5))

  val f1, f2, f3, f4, f5 = Flow[Int].map(x => {
    println(Thread.currentThread())
    x
  }).async

  in ~> bal ~> f1 ~> merge ~> out
  bal ~> f2 ~> merge
  bal ~> f3 ~> merge
  bal ~> f4 ~> merge
  bal ~> f5 ~> merge

  ClosedShape
})

This outputs:

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

My expectation was that the output would be something along the lines of:

Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

How can I change this code sample so that the Flows are being executed in parallel?


Solution

  • The async directive does not guarantee your stages will be executed in a separated thread. As long as the stages do not overlap in time, they might run on the same thread.

    For your specific case, the executed steps might be the following:

    • merge requests an element on the 1st inlet
    • balance serves an element through the 1st flow
    • merge requests an element on the 2nd inlet
    • balance serves an element through the 2nd flow
    • etc.

    Now if you change your Balance as follows

    val bal = builder.add(Balance[Int](5, waitForAllDownstreams = true))
    

    You will be forcing 5 threads to be spawned, as the steps would be

    • merge requests an element on 1st inlet
    • merge requests an element on 2nd inlet
    • merge requests an element on 3rd inlet
    • merge requests an element on 4th inlet
    • merge requests an element on 5th inlet
    • balance starts serving elements through all flows