Search code examples
scalaakka-stream

Why Builder.add should be used for junctions but not for Source/Flow/Sink in GraphDSL.create?


In akka-stream docs there is an example on how to use GraphDSL.create for constructing graphs of nonlinear shapes. The simplest code sample follows for a graph with one fan-out element:

val graph = GraphDSL.create(){implicit builder =>
  val source = Source(1 to 100)
  val flow = Flow.apply[Int]
  val sink1 = Sink.foreach[Int](println)
  val sink2 = Sink.foreach[Int](i => println("Sink2: " + i))
  val broadcast = builder.add(Broadcast[Int](2))
  source ~> flow ~> broadcast.in
  broadcast.out(0) ~> sink1
  broadcast.out(1) ~> sink2
  ClosedShape
}
RunnableGraph.fromGraph(graph).run()

My question is why instance of Broadcast should be 'imported' with help of builder.add method while for source/sink/flow a regular instantiation is enough?

If I remove builder.add and just leave the val broadcast = Broadcast[Int](2) then the code will compile but at runtime an exception will be thrown:

Exception in thread "main" java.lang.IllegalArgumentException: [Broadcast.in] is already connected

Documentation does not explain this clearly and just provides it as something given. Could you clarify?


Solution

  • The GraphDSL allows you to build a graph with a series of stateful operations, for this reason all the stages of your graph must be added to the builder. The builder.add operation returns the Shape of a stage, which is basically its blueprint. The Shape can be used with the ~> combinators to compose your graph.

    Some types of stages, however, are more used than others (namely, Source, Sink, Flow), so for these stages Akka provides conveniency functions that add them to the builder for you as part of the ~> call.

    Take a look at the differences between this function and this function in the Akka source code to better understand. These are the ~> functions for Sinks.