Search code examples
akka-stream

Akka Streams- a Merge stage sometimes pushes downstream only once all upstream sources pushed to it


I have been experimenting with writing a custom Source in Java. Specifically, I wrote a Source that takes elements from a BlockingQueue. I'm aware of Source.queue, however I don't know how to get the materialized value if I connect several of those to a Merge stage. Anyway, here's the implementation:

public class TestingSource extends GraphStage<SourceShape<String>> {
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public final Outlet<String> out = Outlet.create("TestingSource.out");
    private final SourceShape<String> shape = SourceShape.of(out);

    private final BlockingQueue<String> queue;
    private final String identifier;

    public TestingSource(BlockingQueue<String> queue, String identifier) {
        this.queue = queue;
        this.identifier = identifier;
    }

    @Override
    public SourceShape<String> shape() {
        return shape;
    }

    @Override
    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        return new GraphStageLogic(shape()) {
            private AsyncCallback<BlockingQueue<String>> callBack;

            {
                setHandler(out, new AbstractOutHandler() {
                    @Override
                    public void onPull() throws Exception {
                        String string = queue.poll();
                        if (string == null) {
                            System.out.println("TestingSource " + identifier + " no records in queue, invoking callback");
                            executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream
                        } else {
                            System.out.println("TestingSource " + identifier + " found record during pull, pushing");
                            push(out, string);
                        }
                    }
                });
            }

            @Override
            public void preStart() {
                callBack = createAsyncCallback(queue -> {
                    String string = null;
                    while (string == null) {
                        Thread.sleep(100);
                        string = queue.poll();
                    }
                    push(out, string);
                    System.out.println("TestingSource " + identifier + " found record during callback, pushed");
                });
            }
        };
    }
}

This example works, so it seems that my Source is working properly:

private static void simpleStream() throws InterruptedException {
    BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    Source.fromGraph(new TestingSource(queue, "source"))
            .to(Sink.foreach(record -> System.out.println(record)))
            .run(materializer);

    Thread.sleep(2500);
    queue.add("first");

    Thread.sleep(2500);
    queue.add("second");
}

I also wrote an example that connects two of the Sources to a Merge stage:

private static void simpleMerge() throws InterruptedException {
    BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
    BlockingQueue<String> queue2 = new LinkedBlockingQueue<>();

    final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
            Sink.foreach(record -> System.out.println(record)),
            (builder, out) -> {
                final UniformFanInShape<String, String> merge =
                        builder.add(Merge.create(2));
                builder.from(builder.add(new TestingSource(queue1, "queue1")))
                        .toInlet(merge.in(0));
                builder.from(builder.add(new TestingSource(queue2, "queue2")))
                        .toInlet(merge.in(1));

                builder.from(merge.out())
                        .to(out);
                return ClosedShape.getInstance();
            }));
    result.run(materializer);

    Thread.sleep(2500);
    System.out.println("seeding first queue");
    queue1.add("first");

    Thread.sleep(2500);
    System.out.println("seeding second queue");
    queue2.add("second");
}

Sometimes this example works as I expect- it prints "first" after 5 seconds, and then prints "second" after another 5 seconds.

However, sometimes (about 1 in 5 runs) it prints "second" after 10 seconds, and then immediately print "first". In other words, the Merge stage pushes the strings downstream only when both Sources pushed something. The full output looks like this:

TestingSource queue1 no records in queue, invoking callback
TestingSource queue2 no records in queue, invoking callback
seeding first queue
seeding second queue
TestingSource queue2 found record during callback, pushed
second
TestingSource queue2 no records in queue, invoking callback
TestingSource queue1 found record during callback, pushed
first
TestingSource queue1 no records in queue, invoking callback

This phenomenon happens more frequently with MergePreferred and MergePrioritized.

My question is- is this the correct behavior of Merge? If not, what am I doing wrong?


Solution

  • At first glance, blocking the thread with a Thread.sleep in the middle of the stage seems to be at least one of the problems.

    Anyway, I think it would be way easier to use Source.queue, as you mention in the beginning of your question. If the issue is to extract the materialized value when using the GraphDSL, here's how you do it:

        final Source<String, SourceQueueWithComplete<String>> source = Source.queue(100, OverflowStrategy.backpressure());
        final Sink<Object, CompletionStage<akka.Done>> sink = Sink.ignore();
    
        final RunnableGraph<Pair<SourceQueueWithComplete<String>, CompletionStage<akka.Done>>> g =
                RunnableGraph.fromGraph(
                        GraphDSL.create(
                                source,
                                sink,
                                Keep.both(),
                                (b, src, snk) -> {
                                    b.from(src).to(snk);
                                    return ClosedShape.getInstance();
                                }
                        )
                );
    
        g.run(materializer);  // this gives you back the queue
    

    More info on this in the docs.