Below is code I use to calculate the average of a stream of data within a List of objects:
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class sd001 {
private static final ActorSystem system = ActorSystem.create("akkassembly");
private static List<RData> ls = new ArrayList();
private static class RData {
private String id;
public RData(String id){
this.id = id;
}
public List<Integer> getValues(){
if(this.id.equalsIgnoreCase("1")) {
return Arrays.asList(1, 2, 3, 4, 5);
}
else {
return Arrays.asList(1, 2, 3);
}
}
public String getId() {
return this.id;
}
}
final static List<RData> builderFunction() {
try {
ls.add(new RData("1"));
ls.add(new RData("2"));
ls.add(new RData("3"));
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ls;
}
private static double calculateAverage(List <Integer> marks) {
return marks.stream()
.mapToDouble(d -> d)
.average()
.orElse(0.0);
}
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
final Source<List<RData>, NotUsed> source2 =
Source.repeat(NotUsed.getInstance()).map(elem -> builderFunction());
source2.mapConcat(i -> i)
.groupBy(3 , x -> x.getId())
.map(v -> calculateAverage(v.getValues()))
.to(Sink.foreach(x -> System.out.println(x)))
.run(system);
}
}
Which resulting output:
11:55:27.477 [akkassembly-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
3.0
2.0
2.0
3.0
so appears to be working as expected.
I use the groupBy
method (https://doc.akka.io/docs/akka/current/stream/stream-substream.html) to group the List
of items by its associated id value. How to add the id value to the stage where the average is outputted so instead of just the average being outputted the id is also printed to screen? The stage I'm referring to is:
.to(Sink.foreach(x -> System.out.println(x)))
One possible solution is to modify the method getValues
and create a new parameter id
and return the id
in addition to the average value, this will allow accessing the value within the println
for Sink
. This solution seems overly complex. Does it appear I need to carry an extra state (the id
in this case) between the map
and to
functions ?
In general, stages in Akka Streams do not share state: they only pass elements of the stream between themselves. Thus the only general way to pass state between stages of a stream is to embed the state into the elements being passed.
In some cases, one could use SourceWithContext
/FlowWithContext
:
Essentially, a
FlowWithContext
is just aFlow
that contains tuples of element and context, but the advantage is in the operators: most operators onFlowWithContext
will work on the element rather than on the tuple, allowing you to focus on your application logic rather without worrying about the context.
In this particular case, since groupBy
is doing something similar to reordering elements, FlowWithContext
doesn't support groupBy
, so you'll have to embed the IDs into the stream elements...
(...Unless you want to dive into the deep end of a custom graph stage, which will likely dwarf the complexity of embedding the IDs into the stream elements.)