I'm attempting to return the result of a stream operation which in this case is:
which is represented as:
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
To access the value I use
final AtomicInteger returnValue = new AtomicInteger();
followed by :
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
Which requires a blocking call to allow the stream to complete which is not acceptable:
Thread.sleep(2000);
If I use:
CompletableFuture<Object> futureValue =
ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
System.out.println(futureValue.toCompletableFuture().get().toString());
an error is returned :
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
The recipient actor in this case is Source and returns the following on a Done.done
message:
return Optional.of(CompletionStrategy.immediately());
Can an Akka stream be used to return the computed value from the stream? The only alternative is to store the computed value in a DB or send it to a Kafka topic when the value is computed in :
.to(Sink.foreach(x -> {
?
Complete src:
import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class GetStreamValue {
final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");
public static void main(String args[]) throws InterruptedException, ExecutionException {
int bufferSize = 100;
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
}
else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
final AtomicInteger returnValue = new AtomicInteger();
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
.run(system);
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
Thread.sleep(2000);
System.out.println("returnValue is "+returnValue);
}
}
I think what you may be missing is to understand the concept of materialized value in Akka Streams. Scan through this part of the documentation especially around combining materialized values. I also had a go at trying to explain this concept here (search for Materialized value). If you grok materialized value, then perhaps what I write here will make more sense.
A call to Source.actorRef(..)
returns Source<T, ActorRef>
where T is data type of elements flowing through the stream (in your case it's Integer
) and ActorRef
is the materialized value of that Source
. You get the materialized value synchronously when you call run
on RunnableGraph
which is what to(...)
call returns.
That ActorRef
is how you can "drive" the stream as per Source.actorRef(...)
semantics.
Now the question is how you get your hands on the data that passes through the stream. In your case you're reducing all Integers
into one, so instead of using Sink.foreach(...)
which is good for side effects, you can use Sink.head. You see,Sink
s can also produce materialized values and in case of Sink.head
it materializes to CompletionStage
of the first element in the stream, which in your case is the only element. So let's try that:
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.head())
.run(system);
Ok, that didn't help too much. You still are getting just the materialized value of the Source
. To get the materialized value of the Sink
we need to explicitly ask for it:
final Pair<ActorRef, CompletionStage<Integer>> matVals =
source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.toMat(Sink.head(), Keep.both())
.run(system);
Now we get both Source
and Sink
materialized values. You can drive your stream via the ActorRef
as before:
final ActorRef actorRef = matVals.first();
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
and also you can use CompletableStage
API to get your value out of the stream. Like say this:
Integer folded = matVals.second().toCompletableFuture().join();
Yes, this is blocking, but you need somehow to stop the main thread from finishing before the streams runs to completion.