Search code examples
javaakkaakka-stream

Adding state between operations within akka stream


Below is code I use to calculate the average of a stream of data within a List of objects:

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class sd001 {

    private static final ActorSystem system = ActorSystem.create("akkassembly");
    private static List<RData> ls = new ArrayList();

    private static class RData {
        private String id;

        public RData(String id){
            this.id = id;
        }

        public List<Integer> getValues(){
            if(this.id.equalsIgnoreCase("1")) {
                return Arrays.asList(1, 2, 3, 4, 5);
            }
            else {
                return Arrays.asList(1, 2, 3);
            }
        }

        public String getId() {
            return this.id;
        }
    }

    final static List<RData> builderFunction() {
        try {
            ls.add(new RData("1"));
            ls.add(new RData("2"));
            ls.add(new RData("3"));
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ls;
    }

    private static double calculateAverage(List <Integer> marks) {
        return marks.stream()
                .mapToDouble(d -> d)
                .average()
                .orElse(0.0);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

        final Source<List<RData>, NotUsed> source2 =
                Source.repeat(NotUsed.getInstance()).map(elem -> builderFunction());

                source2.mapConcat(i -> i)
                .groupBy(3 , x -> x.getId())
                .map(v -> calculateAverage(v.getValues()))
                .to(Sink.foreach(x -> System.out.println(x)))
                .run(system);

    }

}

Which resulting output:

11:55:27.477 [akkassembly-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
3.0
2.0
2.0
3.0

so appears to be working as expected.

I use the groupBy method (https://doc.akka.io/docs/akka/current/stream/stream-substream.html) to group the List of items by its associated id value. How to add the id value to the stage where the average is outputted so instead of just the average being outputted the id is also printed to screen? The stage I'm referring to is:

.to(Sink.foreach(x -> System.out.println(x)))

One possible solution is to modify the method getValues and create a new parameter id and return the id in addition to the average value, this will allow accessing the value within the println for Sink. This solution seems overly complex. Does it appear I need to carry an extra state (the id in this case) between the map and to functions ?


Solution

  • In general, stages in Akka Streams do not share state: they only pass elements of the stream between themselves. Thus the only general way to pass state between stages of a stream is to embed the state into the elements being passed.

    In some cases, one could use SourceWithContext/FlowWithContext:

    Essentially, a FlowWithContext is just a Flow that contains tuples of element and context, but the advantage is in the operators: most operators on FlowWithContext will work on the element rather than on the tuple, allowing you to focus on your application logic rather without worrying about the context.

    In this particular case, since groupBy is doing something similar to reordering elements, FlowWithContext doesn't support groupBy, so you'll have to embed the IDs into the stream elements...

    (...Unless you want to dive into the deep end of a custom graph stage, which will likely dwarf the complexity of embedding the IDs into the stream elements.)