Search code examples
scalaconcurrencyakkaakka-stream

How to make fan-out akka-streams processing concurrent


I'm attempting to use Akka Streams to concurrently process a series of dependent streams.

Something like this:

val concurrency = 2
Source(
  (1 to 5).toStream.map(i => {
    println(s"1: Emitting $i")
    i.toString
  }))
  .mapAsyncUnordered(concurrency)(s => getNextStream(s, 25))
  .mapConcat(identity)
  .mapAsyncUnordered(concurrency)(out => getNextStream(out.x, 50))
  .mapConcat(identity)
  .mapAsyncUnordered(concurrency)(out => getNextStream(out.x, 100))
  .mapConcat(identity)
  .map(x => println(s"4: Received $x after ${System.currentTimeMillis() - start}"))
  .runWith(Sink.ignore)

My problem is it doesn't appear to be running concurrently. Changing the concurrency variable has no effect beyond 2. I suspect that mapConcat is serializing the processing but I'm not sure.

A full, runnable example of the problem can be found here: https://github.com/realrunner/akka-stream-example.

Currently, the code takes 11 seconds to complete. I could easily cut it down using raw Actors, without the benefit of properly handling backpressure. Any ideas on how to make this more concurrent?


Solution

  • Each and every one of these getNextStream calls go out and use ask against one single(ton) actor. Keep in mind an actor processes incoming messages in a serialized manner.

    Now when processing the message, you block the actor by using Thread.sleep. Blocking is generally discouraged within Akka - see this bit of the docs.

    Depending on what is the real behaviour of your actors, you can simulate long processing in a non-blocking way using the after pattern (see docs), or if blocking is really needed - but double, triple check that - you can block on a dedicated dispatcher (as explained here).