Search code examples
javaakkaakka-stream

How to access the computation result of an Akka Stream?


I'm attempting to return the result of a stream operation which in this case is:

  1. sum a list
  2. square the value
  3. square the value

which is represented as:

        .fold(0, (aggr, next) -> aggr + next)
        .map(x -> x * x)
        .map(x -> x * x)

To access the value I use

final AtomicInteger returnValue = new AtomicInteger();

followed by :

        .to(Sink.foreach(x -> {
            returnValue.set(x);
            System.out.println("got: " + x);
        }))

Which requires a blocking call to allow the stream to complete which is not acceptable:

Thread.sleep(2000);

If I use:

    CompletableFuture<Object> futureValue =
            ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
    System.out.println(futureValue.toCompletableFuture().get().toString());

an error is returned :

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

The recipient actor in this case is Source and returns the following on a Done.done message:

return Optional.of(CompletionStrategy.immediately());

Can an Akka stream be used to return the computed value from the stream? The only alternative is to store the computed value in a DB or send it to a Kafka topic when the value is computed in :

.to(Sink.foreach(x -> {

?

Complete src:

import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class GetStreamValue {

    final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");

    public static void main(String args[]) throws InterruptedException, ExecutionException {


        int bufferSize = 100;
        final Source<Integer, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            }
                            else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        final AtomicInteger returnValue = new AtomicInteger();

        final ActorRef actorRef = source
                .fold(0, (aggr, next) -> aggr + next)
                .map(x -> x * x)
                .map(x -> x * x)
                .to(Sink.foreach(x -> {
                    returnValue.set(x);
                    System.out.println("got: " + x);
                }))
                .run(system);

        Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        actorRef.tell(Done.done(), ActorRef.noSender());

        Thread.sleep(2000);

        System.out.println("returnValue is "+returnValue);

    }
}

Solution

  • I think what you may be missing is to understand the concept of materialized value in Akka Streams. Scan through this part of the documentation especially around combining materialized values. I also had a go at trying to explain this concept here (search for Materialized value). If you grok materialized value, then perhaps what I write here will make more sense.

    A call to Source.actorRef(..) returns Source<T, ActorRef> where T is data type of elements flowing through the stream (in your case it's Integer) and ActorRef is the materialized value of that Source. You get the materialized value synchronously when you call run on RunnableGraph which is what to(...) call returns.

    That ActorRef is how you can "drive" the stream as per Source.actorRef(...) semantics.

    Now the question is how you get your hands on the data that passes through the stream. In your case you're reducing all Integers into one, so instead of using Sink.foreach(...) which is good for side effects, you can use Sink.head. You see,Sinks can also produce materialized values and in case of Sink.head it materializes to CompletionStage of the first element in the stream, which in your case is the only element. So let's try that:

    final ActorRef actorRef = source
                                    .fold(0, (aggr, next) -> aggr + next)
                                    .map(x -> x * x)
                                    .map(x -> x * x)
                                    .to(Sink.head())
                                    .run(system);
    

    Ok, that didn't help too much. You still are getting just the materialized value of the Source. To get the materialized value of the Sink we need to explicitly ask for it:

    final Pair<ActorRef, CompletionStage<Integer>> matVals =
          source
            .fold(0, (aggr, next) -> aggr + next)
            .map(x -> x * x)
            .map(x -> x * x)
            .toMat(Sink.head(), Keep.both())
            .run(system);
    

    Now we get both Source and Sink materialized values. You can drive your stream via the ActorRef as before:

    final ActorRef actorRef = matVals.first();
    
    Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
    Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
    actorRef.tell(Done.done(), ActorRef.noSender());
    

    and also you can use CompletableStage API to get your value out of the stream. Like say this:

    Integer folded = matVals.second().toCompletableFuture().join(); 
    

    Yes, this is blocking, but you need somehow to stop the main thread from finishing before the streams runs to completion.