Search code examples
javaakkaakka-stream

Akka Streams dropping messages?


I am new to Akka Streams. I use it in Java. (akka-stream_2.12, version: 2.5.14).

I wrote the following class:

package main;

import java.io.IOException;

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;

public class AkkaTest {
    public static void main(String[] args) throws IOException, InterruptedException {
        final ActorSystem actorSystem = ActorSystem.create("VehicleSystem");
        final Materializer materializer = ActorMaterializer.create(actorSystem);

        SourceQueueWithComplete<Object> componentA_outPort1 = 
                Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
                    .to(Sink.foreach(str -> System.out.println(str)))
                    .run(materializer);

        for(int i=1; i<100000; i++)
            componentA_outPort1.offer("Akka rocks: " + i);

        System.in.read();
        actorSystem.terminate();
        System.out.println("Done.");
    }
}

I would expect the code to print 100000 messages, as this is the number of iterations. Instead, it just prints messages 1-101, and then messages starting at about 61000 (i.e "Akka rocks: 61000").

So most of the messages are not printed. Can you explain why?


Solution

  • First hint to the problem here is the fact that "Done." is not printed to the console at the end. Instead it is printed somewhere either at the beginning or between the "Akka rocks" prints.

    The reason for this is that SourceQueue.offer is asynchronous. It returns a CompletionStage and you are not waiting for its completion. The fact that some stream elements are "lost" can be explained instead by the method's documentation, specifically the following part:

    Additionally when using the backpressure overflowStrategy: - If the buffer is full the Future won't be completed until there is space in the buffer - Calling offer before the Future is completed in this case will return a failed Future

    You can verify this by doing:

    SourceQueueWithComplete<Object> componentA_outPort1 = 
        Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
            .to(Sink.foreach(str -> System.out.println(str)))
            .run(materializer);
    
    for (int i=1; i<100000; i++) {
      CompletionStage<QueueOfferResult> result = componentA_outPort1.offer("Akka rocks: " + i);
      System.out.println(result);
    }
    

    You will see a lot of these "scala.concurrent.java8.FuturesConvertersImpl$CF@39471dfa[Not completed]"

    In order to solve it, you should wait for the offer's CompletionStage to complete, effectively making the overall call synchronous, which seems to be your intention:

    SourceQueueWithComplete<Object> componentA_outPort1 = 
        Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
            .to(Sink.foreach(str -> System.out.println(str)))
            .run(materializer);
    
    for (int i=1; i<100000; i++) {
      componentA_outPort1.offer("Akka rocks: " + i).toCompletableFuture().join();
    }
    

    Still "Done" will not necessarily be printed at the end, because the completion of the offer only guarantees you that the queue accepted the element, not that it was completely processed. Also, keep in mind that the actorSystem.terminate() is also asynchronous.

    The above approach works for your case, but in some cases it may not be desirable to block the current thread. For a simple case like yours, it can be easily avoided by using a different Source:

    Source.range(1,  1000).map(i -> "Akka rocks: " + i)
    

    For more complex cases, consider other static methods of Source, like Source.from which takes an Iterable, or Source.fromIterator.