In akka-stream
docs there is an example on how to use GraphDSL.create
for constructing graphs of nonlinear shapes. The simplest code sample follows for a graph with one fan-out element:
val graph = GraphDSL.create(){implicit builder =>
val source = Source(1 to 100)
val flow = Flow.apply[Int]
val sink1 = Sink.foreach[Int](println)
val sink2 = Sink.foreach[Int](i => println("Sink2: " + i))
val broadcast = builder.add(Broadcast[Int](2))
source ~> flow ~> broadcast.in
broadcast.out(0) ~> sink1
broadcast.out(1) ~> sink2
ClosedShape
}
RunnableGraph.fromGraph(graph).run()
My question is why instance of Broadcast
should be 'imported' with help of builder.add
method while for source/sink/flow a regular instantiation is enough?
If I remove builder.add
and just leave the val broadcast = Broadcast[Int](2)
then the code will compile but at runtime an exception will be thrown:
Exception in thread "main" java.lang.IllegalArgumentException: [Broadcast.in] is already connected
Documentation does not explain this clearly and just provides it as something given. Could you clarify?
The GraphDSL allows you to build a graph with a series of stateful operations, for this reason all the stages of your graph must be added to the builder. The builder.add
operation returns the Shape
of a stage, which is basically its blueprint. The Shape
can be used with the ~>
combinators to compose your graph.
Some types of stages, however, are more used than others (namely, Source
, Sink
, Flow
), so for these stages Akka provides conveniency functions that add them to the builder for you as part of the ~>
call.
Take a look at the differences between this function and this function in the Akka source code to better understand. These are the ~>
functions for Sink
s.