Search code examples
javaakka

Merging 2 sources in streams


I use final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create); to combine two sources. Reading https://doc.akka.io/docs/akka/current/stream/operators/Source/combine.html does not provide an example of merging > 2 sources so I'm unsure if using null is a correct method of combining 2 sources.

When I run the below code 100.0 is continually printed. Each source computes the average of a sliding window of values where each window size is 3. The difference between each source is source1 utilises and source2 utilises 10. But source2 is not being executed as

sources.to(printSink).run(actorSystem); just outputs `100` - the first source result.

How to correctly combine source1 and source2 such that each source is executed?

src :

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Concat;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.time.Duration;
import java.util.concurrent.CompletionStage;


public class MultipleStreams {

    public static void main(String args[]) {

        ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");

        final String json1 = "100";
        Sink<Double, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        final Source<Double, NotUsed> source1 = Source.repeat(json1).throttle(3, Duration.ofMillis(1000))
                .sliding(3, 3)
                .map(x -> {
                    final Double b = x.stream()
                            .mapToDouble(a -> Double.valueOf(a))
                            .sum() / x.size();
                    return b;
                });

        final String json2 = "10";
        final Source<Double, NotUsed> source2 = Source.repeat(json2).throttle(3, Duration.ofMillis(1000))
                .sliding(3, 3)
                .map(x -> {
                    return x.stream()
                            .mapToDouble(a -> Double.valueOf(a))
                            .sum() / x.size();
                });

        final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);

        sources.to(printSink).run(actorSystem);

    }
}

Solution

  • Concat tries to empty the first source first.

    Changing it to Merge gives the Output

    100.0
    10.0
    100.0
    10.0
    100.0
    10.0
    100.0
    10.0
    10.0
    100.0
    10.0