Search code examples
javaakkaakka-streamreactive-streams

Akka streams don't run when Source has large number of records


I'm trying to write a very simple introductory example of using Akka Streams. I'm attempting to basically create a stream that takes a range of integers as a source and filters out all the integers that are not prime, producing a stream of prime integers as its output.

The class that constructs the stream is rather simple; for that I have the following.

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class PrimeStream {
    private final AverageRepository averageRepository = new AverageRepository();
    private final ActorSystem actorSystem;

    public PrimeStream(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }

    public Flow<Integer, Integer, NotUsed> filterPrimes() {
        return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
    }
}

When I run the following test, it works fine.

private final ActorSystem actorSystem = ActorSystem.create("Sys");

@Test
public void testStreams() {
    Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
    Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
    flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}

However, when I increase the range by a factor of x10 by changing the line in the test to the following, it no longer works.

Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);

Now when the test runs, no exceptions are thrown, no warnings. It simply runs, then exits, without displaying any text to the console at all.

Just to be extra certain that the problem wasn't in my primality test itself, I ran the test over the same range without using Akka Streams, and it runs fine. The following code runs without a problem.

@Test
public void testPlain() {
    List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
    List<Integer> out = PrimeKernel.filterPrimes(in);
    System.out.println(out);
}

Just for the sake of clarity, the primality test itself takes in a list of integers and sets any element in the list to 0 if it is not prime.

As suggested by @RamonJRomeroyVigil if i remove the mapConcat part all together but leave everythig the same it does, in fact, print out 10,000 integers. However If i leave everything the same but simply replace filterPrimes with a method that just returns the method parameter as is without touching it, then it doesnt print anything to the screen at all. I've also tried adding a println to the begining filterPrime to debug it. Whenever it doesnt print any output that includes the debugging statement. So no attempt is even made to call filterPrimes at all.


Solution

  • runForeach returns a CompletionStage, so if you want to see all the numbers getting printed then you have to await on the CompletionStage otherwise the test function returns and the program terminates without the CompletionStage getting completed.

    Example:

    flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem)).toCompletableFuture().join();