2 threads are started. dataListUpdateThread
adds the number 2 to a List
. processFlowThread
sums the values in the same List
and prints the summed list to the console. Here is the code:
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import static java.lang.Thread.sleep;
public class SourceExample {
private final static ActorSystem system = ActorSystem.create("SourceExample");
private static void delayOneSecond() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void printValue(CompletableFuture<Integer> integerCompletableFuture) {
try {
System.out.println("Sum is " + integerCompletableFuture.get().intValue());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final List dataList = new ArrayList<Integer>();
final Thread dataListUpdateThread = new Thread(() -> {
while (true) {
dataList.add(2);
System.out.println(dataList);
delayOneSecond();
}
});
dataListUpdateThread.start();
final Thread processFlowThread = new Thread(() -> {
while (true) {
final Source<Integer, NotUsed> source = Source.from(dataList);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.fold(0, (agg, next) -> agg + next);
final CompletionStage<Integer> sum = source.runWith(sink, system);
printValue(sum.toCompletableFuture());
delayOneSecond();
}
});
processFlowThread.start();
}
}
I've tried to create the simplest example to frame the question. dataListUpdateThread
could be populating the List from a REST service or Kafka topic instead of just adding the value 2 to the List. Instead of using Java threads how should this scenario be implemented? In other words, how to share dataList
to the Akka Stream for processing?
Mutating the collection passed to Source.from
is only ever going to accomplish this by coincidence: if the collection is ever exhausted, Source.from
will complete the stream. This is because it's intended for finite, strictly evaluated data (the use cases are basically: a) simple examples for the docs and b) situations where you want to bound resource consumption when performing an operation for a collection in the background (think a list of URLs that you want to send HTTP requests to)).
NB: I haven't written Java to any great extent since the Java 7 days, so I'm not providing Java code, just an outline of approaches.
As mentioned in a prior answer Source.queue
is probably the best option (besides using something like Akka HTTP or an Alpakka connector). In a case such as this, where the stream's materialized value is a future that won't be completed until the stream completes, that Source.queue
will never complete the stream (because there's no way for it to know that its reference is the only reference), introducing a KillSwitch
and propagating that through viaMat
and toMat
would give you the ability to decide outside of the stream to complete the stream.
An alternative to Source.queue
, is Source.actorRef
, which lets you send a distinguished message (akka.Done.done()
in the Java API is pretty common for this). That source materializes as an ActorRef
to which you can tell
messages, and those messages (at least those which match the type of the stream) will be available for the stream to consume.
With both Source.queue
and Source.actorRef
, it's often useful to prematerialize
them: the alternative in a situation like your example where you also want the materialized value of the sink, is to make heavy use of the Mat
operators to customize materialized values (in Scala, it's possible to use tuples to at least simplify combining multiple materialized values, but in Java, once you got beyond a pair (as you would with queue
), I'm pretty sure you'd have to define a class just to hold the three (queue, killswitch, future for completed value) materialized values).
It's also worth noting that, since Akka Streams run on actors in the background (and thus get scheduled as needed onto the ActorSystem
's threads), there's almost never a reason to create a thread on which to run a stream.