Search code examples
scalaakkaakka-stream

Does the groupBy in Akka stream creates substreams that run in parallel?


I created this example of groupBy -> map -> mergeSubstreamsWithParallelism using Akka streams. On the course that I am doing, it says that the groupBy will create X substreams regarding the parameter that I pass to it and then I have to merge the substreams to a single stream. So, I understand that the map operator is running in parallel. Is that right?

If so, why I can see the same thread executing the map operator in this code:

val textSource = Source(List(
  "I love Akka streams", // odd
  "this has even characters", // even
  "this is amazing", // odd
  "learning Akka at the Rock the JVM", // odd
  "Let's rock the JVM", // even
  "123", // odd
  "1234" // even
))
val totalCharCountFuture = textSource
  .groupBy(2, string => string.length % 2)
  .map { c =>
    println(s"I am running on thread [${Thread.currentThread().getId}]")
    c.length
  }// .async // this operator runs in parallel
  .mergeSubstreamsWithParallelism(2)
  .toMat(Sink.reduce[Int](_ + _))(Keep.right)
  .run()
totalCharCountFuture.onComplete {
  case Success(value) => println(s"total char count: $value")
  case Failure(exception) => println(s"failed computation: $exception")
}

the output:

I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
total char count: 116

then I added the .async to make the operator run in asynchronously. Then my output shows different threads executing the map operator:

I am running on thread [21]
I am running on thread [21]
I am running on thread [21]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]

I read the documentation at Akka docs about the async boundary:

Put an asynchronous boundary around this Flow. If this is a SubFlow (created e.g. by groupBy), this creates an asynchronous boundary around each materialized sub-flow, not the super-flow. That way, the super-flow will communicate with sub-flows asynchronously.

So, do I need the .async after the groupBy to ensure that all substreams are executing in parallel or not? Is this test that I am doing validity the parallelism of an operator in Akka stream?

Thanks


Solution

  • So, do I need the .async after the groupBy to ensure that all substreams are executing in parallel or not? Is this test that I am doing validity the parallelism of an operator in Akka stream?

    The short answer is "yes", you need async.

    As a rule of thumb in Akka Streams (and other reactive streams spec implementation like RxJava or Project Reactor), you need to explicitly demarcate async boundaries. By default the streams are executed single threaded (or single actor in case of Akka Streams). That includes operators like groupBy. This may seem a bit counter intuitive at first, but when you think about it, parallel execution is not really a must in groupBy semantics, even though often you want parallel execution because it's the very reason you apply groupBy, be it to use all cores available for some computation task or perhaps to call some external service in parallel and get better throughput. In those cases you need to explicitly code for that parallelism to occur. One way is using async as you did in your example, where the stream execution implementation logic would introduce that parallelism or you could also user mapAsync where the parallelism is introduced by some means external to the stream execution logic.