Search code examples
javascalaakkaakka-stream

Changing source data for akka streams


I'm learning about Java Akka streams and using https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html have defined the following:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

public class SourceExample {

    static ActorSystem system = ActorSystem.create("SourceExample");

    public static void main(String args[]) throws ExecutionException, InterruptedException {

        final List<Integer> sourceData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        final Source<Integer, NotUsed> source =
                Source.from(sourceData);
        final Sink<Integer, CompletionStage<Integer>> sink =
                Sink.<Integer, Integer>fold(0, (agg, next) -> agg + next);

        final CompletionStage<Integer> sum = source.runWith(sink, system);

        System.out.println(sum.toCompletableFuture().get());
    }

}

Running this code behaves as expected.

Is the problem Akka Streams is resolving that this code can be executed repeatedly.

In a real-world scenario, sourceData will not be static, does Akka Streams have an opinion as to how changing data should be handled or is it determined by the developer?

In the simplest case just re-execute the streaming Flow every X minutes (using a scheduled Task for example) when the source data changes. Or are Akka streams long-lived, the source data changes and the stream computations are re-executed according to some parameters?

The Akka Streams documentation defines multiple sources of data but I don't understand how Akka Streams should be utilised to handle changing source data.


Solution

  • Akka Streams can, and often do, run until (shortly before) your app stops. For instance, it's common to have a stream consuming (e.g. using a Kafka consumer source from Alpakka Kafka) Kafka records start very early in an app and not stop until the app is killed.

    To elaborate, a stream runs until such time as:

    • a stage signals completion (e.g. in your example, Source.from would signal completion after emitting 10)
    • a stage fails (typically throwing an exception)

    An example source that's useful for dynamic data (without introducing Alpakka or Akka HTTP) is Source.queue which materializes as a queue for which enqueued elements become available to the stream.