Search code examples
javaakkaakka-stream

Akka Streams run flow asynchronously


I have tested simple async flow if it runs asynchronously and I'm suprised it's not. Do I need some additional configuration?

@Configuration
class StreamingConfiguration
{
 
    @Bean
    Materializer materializer(ActorSystem actorSystem)
    {
        return ActorMaterializer.create(actorSystem);
    }

    @PostConstruct
    public void test(Materializer materializer)
    {
        var takePart = Flow.of(String.class).map(path -> {
            var start = System.currentTimeMillis();
            while (System.currentTimeMillis() - start < 3000) {}
            return path;
        });

        Source.from(Lists.newArrayList("A", "B", "C", "D"))
            .via(takePart.async())
            .toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
            .run(materializer)
            .toCompletableFuture()
            .join();
    }
}

I can see materializer has default fork-join-pool dispatcher

EDIT: sorry but your example also doesn't work. It takes still 12~ seconds to finish while using mapAsync. I tried flatMapMerge with the same result : /

   Function<String, CompletionStage<String>> blocking = s -> {
            try
            {
                Thread.sleep(3000);

            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            return CompletableFuture.completedFuture(s);
        };


        Source.from(List.of("A", "B", "C", "D"))
                .mapAsync(4, blocking)
                .toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
                .run(actorSystem)
                .toCompletableFuture()
                .join();

Solution

  • Akka Streams by default materializes stream stages into a single actor: this avoids the overhead of passing messages between stream stages but it does mean that the second element of the stream won't be consumed until the first element has worked its way through the stream.

    The async operator in a stream means that the stream up to that will be executed in its own actor. In your example code:

    • The Source will be an actor
    • The takePart flow will be an actor
    • The Sink will be an actor

    Each of these will still not allow more than one element to be in process at a time: the gain over not having async is that the Source and Sink can have an element in process at the same time as takePart has an element in process. There's also a small implicit buffer in downstream stages to improve throughput, but that can often be ignored.

    In this stream, the takePart stage takes 3 seconds to process an element and the Source and Sink take a few microseconds (for the sake of illustration, we'll say that the Source takes 5 microseconds and the Sink takes 15 microseconds). So the rough chronology is (ignoring the buffer):

    • time 0: takePart signals demand to Source
    • time 5 us: Source emits A to takePart
    • time 3 seconds + 5 us: takePart emits A to Sink, signals demand to Source
    • time 3 seconds + 10 us: Source emits B to takePart
    • time 3 seconds + 20 us: Sink processes A, signals demand to takePart
    • time 6 seconds + 10 us: takePart emits B to Sink, signals demand to Source
    • time 6 seconds + 15 us: Source emits C to takePart
    • time 6 seconds + 25 us: Sink processes B, signals demand to takePart
    • time 9 seconds + 15 us: takePart emits C to Sink, signals demand to Source
    • time 9 seconds + 20 us: Source emits D to takePart
    • time 9 seconds + 30 us: Sink processes C, signals demand to takePart
    • time 12 seconds + 20 us: takePart emits D to Sink, signals demand to Source, Source completes, takePart completes
    • time 12 seconds + 35 us: Sink processes D, completes

    Absent the async, the stream would complete in 4 * (3 sec + 20 us), so the async saved 45 us (cumulatively, async in this stream would save 15 us for every element after the first), so not much of a gain. A pipelined stream at full utilization has throughput gated by the slowest section (you can imagine a highway where the speed limit drops: if the traffic is heavy enough to saturate the highway, the speed on the highway before the speed limit drop will be the speed limit after the drop): you get the best results if each side of the async processes elements at about the same rate.

    There is, somewhat confusingly, another usage of "async" in the Akka Streams API, used to denote stages which communicate with asynchronous processes by obtaining Futures (Scala) or CompletionStages (Java): the process completing the Future/CompletionStage may run on a different thread, and the stream stage often includes some limit on the number of Futures/CompletionStages it will allow to be in flight at a time. mapAsync is an example of this.

    In Scala (I am generally unfamiliar with the Java future APIs), this would be something like (ignoring setting an implicit ExecutionContext, etc.):

    def blockOnElement(e: String): Future[String] = Future {
      Thread.sleep(3000)
      e
    }
    
    Source(List("A", "B", "C", "D"))
      .mapAsync(4)(blockOnElement)
      .runWith(Sink.fold("") { (acc, _) => acc })
    

    In that, assuming sufficient (more than 4) threads in the dispatcher, the entire stream should finish (assuming the 5/15 us delays above) in about 3 seconds and 80 us (the Source and Sink will still combine to spend 20 us on every element.

    In addition to @Alec's mention of flatMapMerge, it's often useful to run a substream in mapAsync by using Source.single and Sink.head: the materialized value of the sink will be a Future/CompletionStage of the output element and the mapAsync will in turn preserve ordering downstream (in contrast to flatMapMerge).