I am comparing the difference between mapAsync
and async
object Demo3 extends App{
implicit val system = ActorSystem("MyDemo")
implicit val materializer = ActorMaterializer()
private val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
private def test(a:Int) ={
println(s"Flow A : ${Thread.currentThread().getName()}" )
Future(println(a+1))(ec)
}
Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
}
Output
Flow A : MyDemo-akka.actor.default-dispatcher-2
2
Flow A : MyDemo-akka.actor.default-dispatcher-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-2
Flow A : MyDemo-akka.actor.default-dispatcher-2
4
Flow A : MyDemo-akka.actor.default-dispatcher-2
5
Flow A : MyDemo-akka.actor.default-dispatcher-2
6
Flow A : MyDemo-akka.actor.default-dispatcher-2
7
Flow A : MyDemo-akka.actor.default-dispatcher-2
8
Flow A : MyDemo-akka.actor.default-dispatcher-2
9
Flow A : MyDemo-akka.actor.default-dispatcher-2
10
11
Why dispite of parallelism to 10 it shows one name of thread. Is it not running asynchonously?
When I replace it with line Source(1 to 100).map(test).async.to(Sink.ignore).run()
,
Does mapAsync
andasync
uses single thread everytime?
output
Flow A : MyDemo-akka.actor.default-dispatcher-4
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
9
Flow A : MyDemo-akka.actor.default-dispatcher-4
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
11
In test
, the println
that prints the thread ID is executed outside of the future, therefore it's executed synchronously. The code inside the future will get executed on a thread of the ExecutionContext
(in this case the actor system's dispatcher). It's worth noting that some parallel execution happened: the thread print for a = 4
happened before the a + 1
print for a = 3
.
If you move the thread println
into the future, that println
will execute asynchronously:
Future {
println(s"Flow A : ${Thread.currentThread().getName()}")
println(a+1)
}(ec)
Note that in your test code, you're not likely to see much parallel execution anyway: the amount of work involved in spawning the future is close to the amount of work done in the future (even with the second print in the future), so the spawned futures often complete before the next future can spawn.
mapAsync
can best be thought of as synchronously calling code which returns a future (the future may or may not be completed at the time it's returned) and stores that future in a buffer of size parallelism
. When a future in that buffer completes successfully, the value it completed with is emitted and the slot in the buffer frees up, allowing mapAsync
to demand another element (I'm technically describing mapAsyncUnordered
because it's simpler: mapAsync
won't emit until every future created prior to the completed one has successfully completed and emitted; I don't actually know off the top of my head if a later element completing opens up a slot in the buffer or not). Whether this actually results in parallelism depends on the particulars of the future and how it's completed (e.g. if the future is an ask of the same actor every time, the effective parallelism is unlikely to ever be more than 1).
async
probably should have been called stageBoundary
or something similar in my opinion, precisely because it often leads people to think that mapAsync
and map(...).async
have much if anything in common. async
is a signal to the Materializer
that the stages between the previous async
and this async
should not be fused with the stages after the async
. In the usual ActorMaterializer
, fused stages execute in a single actor. This has the advantage of eliminating the overhead of transferring elements from stage to stage, at the cost of limiting, in general, the number of elements executing in a fused stage to 1. There's an implicit buffer between fused stages: the downstream stage will signal demand based on the empty slots in its buffer. The two stages will process in parallel, in the sense that the (possibly fused) stage before the async
can process an element at the same time that the (possibly fused) stage after the async
is processing an element. This is basically pipelined execution.
So in Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
the entire stream is materialized as a single actor where, (effectively: this is a description which complies with the requirements of how streams are materialized) in a single actor (so thus all of these, except for tasks scheduled onto an ExecutionContext
, execute synchronously, in order):
Sink.ignore
signals effectively unbounded demand to mapAsync
mapAsync
has 10 empty slots, so requests 10 elements from the sourcesource
emits 1mapAsync
prints the current thread, creates a Promise[Unit]
, creates a closure object which looks something likenew ClosureClassName { val a: Int = 1; def run(): Unit = println(a+1) }
and schedules a task on ec
which will run the run
method of the closure and complete the promise's future. The ec
will schedule the task on a thread for asynchronous execution according to the logic of the ec
; meanwhile our actor saves the future in its buffer (call it futureBuffer(0)
)
futureBuffer(0)
, it's completed (with ()
, the singleton value for Unit
), mapAsync
emits ()
to Sink.ignore
and clears futureBuffer(0)
Sink.ignore
, well, ignores the ()
it's receivedsource
now emits 2mapAsync
executes as above, only with a = 2
in the closurefutureBuffer(0)
(now the future for a = 2
) hasn't yet completed, sosource
now emits 3mapAsync
executes as above, with a = 3
in the closure, saving the future as futureBuffer(1)
futureBuffer(0)
and futureBuffer(1)
have completed, so futureBuffer(0)
's value is emitted to Sink.ignore
and futureBuffer(0)
is clearedSink.ignore
ignores the valuefutureBuffer(1)
's value is emitted to Sink.ignore
and futureBuffer(1)
is clearedSink.ignore
ignores the valueSo there's been a tiny bit of parallelism through mapAsync
: the realized degree of parallelism is essentially the number of uncompleted futures.
For Source(1 to 100).map(test).async.to(Sink.ignore).run()
that will materialize as something like
Actor A (Source.map)
^
| Future[Unit] sent down, demand signal sent up
v
Actor B (Sink.ignore)
Let's say that the materializer settings have a receiving buffer of 2 elements per actor.
Sink.ignore
signals effectively unbounded demand to Actor B
B
has 2 free slots in its buffer, so it sends a message to Actor A
demanding 3 elementsA
passes this demand to map
, which demands 1 element from the sourcemap
prints the current thread etc. as above. It doesn't save the resulting (possibly completed or uncompleted) future in a buffer (it doesn't have one), it emits the future to A
A
passes the future to B
From here on, A
and B
are processing in parallel (at least some of the time in this case, since B
is just sending elements to the bit bucket)
B
passes the future it's received to Sink.ignore
, which ignores the future (not even caring whether the future has completed, or even whether the future failed)and so on... once B has received three elements, it will signal demand for two more (assuming, as is likely, that the Sink
hasn't yet finished ignoring the future and the buffer of 2 elements is empty).
It's worth noting throughout this, that an actor may change from message to message which thread it's running on (whether it does is up to the ActorSystem
's dispatcher), but an actor maintains a single-thread illusion: it's only ever using one thread at a time.