I'm attempting to use Akka Streams to concurrently process a series of dependent streams.
Something like this:
val concurrency = 2
Source(
(1 to 5).toStream.map(i => {
println(s"1: Emitting $i")
i.toString
}))
.mapAsyncUnordered(concurrency)(s => getNextStream(s, 25))
.mapConcat(identity)
.mapAsyncUnordered(concurrency)(out => getNextStream(out.x, 50))
.mapConcat(identity)
.mapAsyncUnordered(concurrency)(out => getNextStream(out.x, 100))
.mapConcat(identity)
.map(x => println(s"4: Received $x after ${System.currentTimeMillis() - start}"))
.runWith(Sink.ignore)
My problem is it doesn't appear to be running concurrently. Changing the concurrency
variable has no effect beyond 2
. I suspect that mapConcat
is serializing the processing but I'm not sure.
A full, runnable example of the problem can be found here: https://github.com/realrunner/akka-stream-example.
Currently, the code takes 11 seconds to complete. I could easily cut it down using raw Actors, without the benefit of properly handling backpressure. Any ideas on how to make this more concurrent?
Each and every one of these getNextStream
calls go out and use ask
against one single(ton) actor. Keep in mind an actor processes incoming messages in a serialized manner.
Now when processing the message, you block the actor by using Thread.sleep
. Blocking is generally discouraged within Akka - see this bit of the docs.
Depending on what is the real behaviour of your actors, you can simulate long processing in a non-blocking way using the after
pattern (see docs), or if blocking is really needed - but double, triple check that - you can block on a dedicated dispatcher (as explained here).