I have defined a simple graph that combines a constant stream (defined via Source.Single(5000)
) with a source that is non-constant (e.g. Source(1 to 100)
). My graph adds the two numbers together via a ZipLatestWith
operator. The intention is for the output of the stream to be 5001, 5002, 5003, 5004, ..., 5100
. In reality the output of the program is simply 5000
, and then the stream terminates presumably because the Single
source completes.
How do I get the expected result, where the source with the constant value of 5000
is combined with every value of the non-constant source? Note that due to conceptual reasons (irrelevant to this specific example), it's important that the constant source remains a source of the stream. A complete code example is as follows, which simply prints the number 5000
to the console.
Main.scala:
import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, ZipLatestWith}
import akka.stream.{FlowShape, Materializer}
object Main {
def apply(): Behavior[NotUsed] = {
Behaviors.setup { context =>
val constantStream: Source[Int, NotUsed] = Source.single(5000)
val graph: Flow[Int, Int, NotUsed] = Flow.fromGraph(GraphDSL.create(){
implicit builder =>
import GraphDSL.Implicits._
val zipper = builder.add(ZipLatestWith[Int, Int, Int]((a: Int, b: Int) => a * b))
constantStream ~> zipper.in1
FlowShape(zipper.in0, zipper.out)
})
Source(1 to 100)
.via(graph)
.to(Sink.foreach(println)).run()(Materializer.apply(context))
Behaviors.receiveSignal {
case (_, Terminated(_)) =>
Behaviors.stopped
}
}
}
def main(args: Array[String]): Unit = {
ActorSystem(Main(), "test")
}
}
build.sbt:
scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.0"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
Just use Source.repeat(5000)
instead.