Search code examples
scalaakkaakka-stream

Concatinating two Flows in Akka stream


I am trying to concat two Flows and I am not able to explain the output of my implementation.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

I expect the following output from this code.

2
3
4
.
.
.
11
10
20
.
.
.
100

Instead, I see only "2" being printed. Can you please explain what is wrong in my implmentation and how should I change the program to get the desired output.


Solution

  • From Akka Stream's API docs:

    Concat:

    Emits when the current stream has an element available; if the current input completes, it tries the next one

    Broadcast:

    Emits when all of the outputs stops backpressuring and there is an input element available

    The two operators won't work in conjunction as there is a conflict in how they work -- Concat tries to pull all elements from one of Broadcast's outputs before switching to the other one, whereas Broadcast won't emit unless there is demand for ALL of its outputs.

    For what you need, you could concatenate using concat as suggested by commenters:

    source.via(flow1).concat(source.via(flow2)).runWith(sink)
    

    or equivalently, use Source.combine like below:

    Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)