Search code examples
scalaakka-stream

Feeding the output of a Flow to a Broadcast in Akka Streams Graph


I am trying to write a Akka Stream graph. The code I have written is

val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
   (sink1, sink2) =>
      import GraphDSL.Implicits._
      val bcast = builder.add(Broadcast[Row](2))
      val flow = source ~> flow1 ~> flow2
      flow.out ~> bcast.in
      bcast.out(0) ~> sink1
      bcast.out(1) ~> flow3 ~> flow4 ~> sink2
      ClosedShape
})

val (f1, f2) = graph.run()
val consolidated = Future.sequence(List(f1, f2))
Await.result(consolidated, Duration.Inf)

This code does not compile because I cannot connect the out of flow to the in of bcast.

I can connect the out of the source to the in of the bcast, but I cannot do that because some portion is common between the two branches. So I must create the branch in the graph only after flow2

Also... I am not sure if I am writing the Graph correctly because it is returning two futures of Done and I need to combine them into a single future manually using Sequence.


Solution

  • You can't wire your graph in 2 steps, as the ~> combinator does not give you back a flow. It is in fact a stateful, declarative operation.

    A better approach here would be to wire your graph in one go, e.g.

      source ~> flow1 ~> flow2 ~> bcast
                                  bcast          ~>          sink1
                                  bcast ~> flow3 ~> flow4 ~> sink2
    

    or, alternatively you can split the declarations by adding a stage to the builder (and retrieving its shape), e.g.

      val flow2s = builder.add(flow2)
    
      source ~> flow1 ~> flow2s.in
      flow2s.out ~> bcast
                    bcast          ~>          sink1
                    bcast ~> flow3 ~> flow4 ~> sink2
    

    Regarding the materialized Futures, you need to choose what is meaningful as a materialized value of your graph as a whole. If you only need one of the 2 Sinks materialized Futures, you need to pass only that one to the GraphDSL.create method. If, otherwise, you are interested in both Futures, it makes perfect sense to sequence or zip them together.