I have tested simple async flow if it runs asynchronously and I'm suprised it's not. Do I need some additional configuration?
@Configuration
class StreamingConfiguration
{
@Bean
Materializer materializer(ActorSystem actorSystem)
{
return ActorMaterializer.create(actorSystem);
}
@PostConstruct
public void test(Materializer materializer)
{
var takePart = Flow.of(String.class).map(path -> {
var start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 3000) {}
return path;
});
Source.from(Lists.newArrayList("A", "B", "C", "D"))
.via(takePart.async())
.toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
.run(materializer)
.toCompletableFuture()
.join();
}
}
I can see materializer has default fork-join-pool dispatcher
EDIT: sorry but your example also doesn't work. It takes still 12~ seconds to finish while using mapAsync
. I tried flatMapMerge
with the same result : /
Function<String, CompletionStage<String>> blocking = s -> {
try
{
Thread.sleep(3000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
return CompletableFuture.completedFuture(s);
};
Source.from(List.of("A", "B", "C", "D"))
.mapAsync(4, blocking)
.toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
.run(actorSystem)
.toCompletableFuture()
.join();
Akka Streams by default materializes stream stages into a single actor: this avoids the overhead of passing messages between stream stages but it does mean that the second element of the stream won't be consumed until the first element has worked its way through the stream.
The async
operator in a stream means that the stream up to that will be executed in its own actor. In your example code:
Source
will be an actortakePart
flow will be an actorSink
will be an actorEach of these will still not allow more than one element to be in process at a time: the gain over not having async
is that the Source
and Sink
can have an element in process at the same time as takePart
has an element in process. There's also a small implicit buffer in downstream stages to improve throughput, but that can often be ignored.
In this stream, the takePart
stage takes 3 seconds to process an element and the Source
and Sink
take a few microseconds (for the sake of illustration, we'll say that the Source
takes 5 microseconds and the Sink
takes 15 microseconds). So the rough chronology is (ignoring the buffer):
takePart
signals demand to Source
Source
emits A to takePart
takePart
emits A to Sink
, signals demand to Source
Source
emits B to takePart
Sink
processes A, signals demand to takePart
takePart
emits B to Sink
, signals demand to Source
Source
emits C to takePart
Sink
processes B, signals demand to takePart
takePart
emits C to Sink
, signals demand to Source
Source
emits D to takePart
Sink
processes C, signals demand to takePart
takePart
emits D to Sink
, signals demand to Source
, Source
completes, takePart
completesSink
processes D, completesAbsent the async
, the stream would complete in 4 * (3 sec + 20 us)
, so the async
saved 45 us (cumulatively, async
in this stream would save 15 us for every element after the first), so not much of a gain. A pipelined stream at full utilization has throughput gated by the slowest section (you can imagine a highway where the speed limit drops: if the traffic is heavy enough to saturate the highway, the speed on the highway before the speed limit drop will be the speed limit after the drop): you get the best results if each side of the async
processes elements at about the same rate.
There is, somewhat confusingly, another usage of "async" in the Akka Streams API, used to denote stages which communicate with asynchronous processes by obtaining Future
s (Scala) or CompletionStage
s (Java): the process completing the Future
/CompletionStage
may run on a different thread, and the stream stage often includes some limit on the number of Future
s/CompletionStage
s it will allow to be in flight at a time. mapAsync
is an example of this.
In Scala (I am generally unfamiliar with the Java future APIs), this would be something like (ignoring setting an implicit ExecutionContext
, etc.):
def blockOnElement(e: String): Future[String] = Future {
Thread.sleep(3000)
e
}
Source(List("A", "B", "C", "D"))
.mapAsync(4)(blockOnElement)
.runWith(Sink.fold("") { (acc, _) => acc })
In that, assuming sufficient (more than 4) threads in the dispatcher, the entire stream should finish (assuming the 5/15 us delays above) in about 3 seconds and 80 us (the Source
and Sink
will still combine to spend 20 us on every element.
In addition to @Alec's mention of flatMapMerge
, it's often useful to run a substream in mapAsync
by using Source.single
and Sink.head
: the materialized value of the sink will be a Future
/CompletionStage
of the output element and the mapAsync
will in turn preserve ordering downstream (in contrast to flatMapMerge
).