I am trying to write a Akka Stream graph. The code I have written is
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
(sink1, sink2) =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Row](2))
val flow = source ~> flow1 ~> flow2
flow.out ~> bcast.in
bcast.out(0) ~> sink1
bcast.out(1) ~> flow3 ~> flow4 ~> sink2
ClosedShape
})
val (f1, f2) = graph.run()
val consolidated = Future.sequence(List(f1, f2))
Await.result(consolidated, Duration.Inf)
This code does not compile because I cannot connect the out of flow to the in of bcast.
I can connect the out of the source to the in of the bcast, but I cannot do that because some portion is common between the two branches. So I must create the branch in the graph only after flow2
Also... I am not sure if I am writing the Graph correctly because it is returning two futures of Done and I need to combine them into a single future manually using Sequence.
You can't wire your graph in 2 steps, as the ~>
combinator does not give you back a flow. It is in fact a stateful, declarative operation.
A better approach here would be to wire your graph in one go, e.g.
source ~> flow1 ~> flow2 ~> bcast
bcast ~> sink1
bcast ~> flow3 ~> flow4 ~> sink2
or, alternatively you can split the declarations by adding a stage to the builder (and retrieving its shape), e.g.
val flow2s = builder.add(flow2)
source ~> flow1 ~> flow2s.in
flow2s.out ~> bcast
bcast ~> sink1
bcast ~> flow3 ~> flow4 ~> sink2
Regarding the materialized Future
s, you need to choose what is meaningful as a materialized value of your graph as a whole. If you only need one of the 2 Sink
s materialized Future
s, you need to pass only that one to the GraphDSL.create
method.
If, otherwise, you are interested in both Future
s, it makes perfect sense to sequence
or zip
them together.