Search code examples
javascalaakkaakka-stream

Processing changing source data in Java Akka streams


2 threads are started. dataListUpdateThread adds the number 2 to a List. processFlowThread sums the values in the same List and prints the summed list to the console. Here is the code:

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

import static java.lang.Thread.sleep;


public class SourceExample {

    private final static ActorSystem system = ActorSystem.create("SourceExample");

    private static void delayOneSecond() {
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void printValue(CompletableFuture<Integer> integerCompletableFuture) {
        try {
            System.out.println("Sum is " + integerCompletableFuture.get().intValue());
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        final List dataList = new ArrayList<Integer>();
        final Thread dataListUpdateThread = new Thread(() -> {
            while (true) {
                dataList.add(2);
                System.out.println(dataList);
                delayOneSecond();
            }
        });
        dataListUpdateThread.start();

        final Thread processFlowThread = new Thread(() -> {
            while (true) {
                final Source<Integer, NotUsed> source = Source.from(dataList);

                final Sink<Integer, CompletionStage<Integer>> sink =
                        Sink.fold(0, (agg, next) -> agg + next);

                final CompletionStage<Integer> sum = source.runWith(sink, system);

                printValue(sum.toCompletableFuture());

                delayOneSecond();
            }
        });

        processFlowThread.start();
    }
}

I've tried to create the simplest example to frame the question. dataListUpdateThread could be populating the List from a REST service or Kafka topic instead of just adding the value 2 to the List. Instead of using Java threads how should this scenario be implemented? In other words, how to share dataList to the Akka Stream for processing?


Solution

  • Mutating the collection passed to Source.from is only ever going to accomplish this by coincidence: if the collection is ever exhausted, Source.from will complete the stream. This is because it's intended for finite, strictly evaluated data (the use cases are basically: a) simple examples for the docs and b) situations where you want to bound resource consumption when performing an operation for a collection in the background (think a list of URLs that you want to send HTTP requests to)).

    NB: I haven't written Java to any great extent since the Java 7 days, so I'm not providing Java code, just an outline of approaches.

    As mentioned in a prior answer Source.queue is probably the best option (besides using something like Akka HTTP or an Alpakka connector). In a case such as this, where the stream's materialized value is a future that won't be completed until the stream completes, that Source.queue will never complete the stream (because there's no way for it to know that its reference is the only reference), introducing a KillSwitch and propagating that through viaMat and toMat would give you the ability to decide outside of the stream to complete the stream.

    An alternative to Source.queue, is Source.actorRef, which lets you send a distinguished message (akka.Done.done() in the Java API is pretty common for this). That source materializes as an ActorRef to which you can tell messages, and those messages (at least those which match the type of the stream) will be available for the stream to consume.

    With both Source.queue and Source.actorRef, it's often useful to prematerialize them: the alternative in a situation like your example where you also want the materialized value of the sink, is to make heavy use of the Mat operators to customize materialized values (in Scala, it's possible to use tuples to at least simplify combining multiple materialized values, but in Java, once you got beyond a pair (as you would with queue), I'm pretty sure you'd have to define a class just to hold the three (queue, killswitch, future for completed value) materialized values).

    It's also worth noting that, since Akka Streams run on actors in the background (and thus get scheduled as needed onto the ActorSystem's threads), there's almost never a reason to create a thread on which to run a stream.