Search code examples
scalaakkaakka-stream

Source.Single() terminates stream with 2 sources prematurely


I have defined a simple graph that combines a constant stream (defined via Source.Single(5000)) with a source that is non-constant (e.g. Source(1 to 100)). My graph adds the two numbers together via a ZipLatestWith operator. The intention is for the output of the stream to be 5001, 5002, 5003, 5004, ..., 5100. In reality the output of the program is simply 5000, and then the stream terminates presumably because the Single source completes.

How do I get the expected result, where the source with the constant value of 5000 is combined with every value of the non-constant source? Note that due to conceptual reasons (irrelevant to this specific example), it's important that the constant source remains a source of the stream. A complete code example is as follows, which simply prints the number 5000 to the console.

Main.scala:

import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, ZipLatestWith}
import akka.stream.{FlowShape, Materializer}

object Main {
  def apply(): Behavior[NotUsed] = {
    Behaviors.setup { context =>

      val constantStream: Source[Int, NotUsed] = Source.single(5000)
      val graph: Flow[Int, Int, NotUsed] = Flow.fromGraph(GraphDSL.create(){
        implicit builder =>
          import GraphDSL.Implicits._

          val zipper = builder.add(ZipLatestWith[Int, Int, Int]((a: Int, b: Int) => a * b))
          constantStream ~> zipper.in1

          FlowShape(zipper.in0, zipper.out)
      })

      Source(1 to 100)
        .via(graph)
        .to(Sink.foreach(println)).run()(Materializer.apply(context))

      Behaviors.receiveSignal {
        case (_, Terminated(_)) =>
          Behaviors.stopped
      }
    }
  }

  def main(args: Array[String]): Unit = {
    ActorSystem(Main(), "test")
  }
}

build.sbt:

scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.0"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"

Solution

  • Just use Source.repeat(5000) instead.