Search code examples
scalaakkaakka-stream

Akka Streams: How to form inlets and outlets for a Graph using a Flow


I have some code that is similar to the following:

object Test extends App {
  val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
      val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
      val balance = builder.add(Balance[Int](2))

      val flow1 = Flow[Int].map(_*2)
      val flow2 = Flow[Int].map(_*2)

      val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
        left + right
      }))

      val flow3 = Flow[Int].map(_*2)

      input ~> buffer ~> balance.in
      balance.out(0) ~> flow1 ~> zip.in0
      balance.out(1) ~> flow2 ~> zip.in1
      zip.out ~> flow3

      FlowShape(input.in, flow3) //Question 2) how to make an outlet here
    })
}

Notice that I had to add a Balance called input, because I cannot retrieve an Inlet from the first Buffer of the FlowShape I want to create. Is there any other simpler way to solve this? Creating a Balance with 1 Outlet seems to be the wrong way to do this.

My second question is similar. I cannot retrieve an Outlet from flow3. The only way I know to solve this problem is to create yet another Balance, and expose its Outlet as the Outlet of the entire FlowShape. Any better way to solve this problem?


Solution

  • A Balance is a fan-out shape that emits to the first available output. Considering you are zipping the flows in the next step, what you need is a Broadcast. It will fan-out to all outputs when all of them are available.

    Also, the builder can add any of the shapes that are a Graph, this includes Flow. You don't have to use a custom shape for that.

    The updated code:

    object Test extends App {
      val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
        Flow.fromGraph(GraphDSL.create() { implicit builder =>
          import GraphDSL.Implicits._
    
          val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
          val input = builder.add(buffer) 
          val broadcast = builder.add(Broadcast[Int](2))
    
          val flow1 = Flow[Int].map(_*2)
          val flow2 = Flow[Int].map(_*2)
    
          val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
            left + right
          }))
    
          val flow3 = builder.add(Flow[Int].map(_*2))
    
          input ~> broadcast.in
          broadcast.out(0) ~> flow1 ~> zip.in0
          broadcast.out(1) ~> flow2 ~> zip.in1
          zip.out ~> flow3.in
    
          FlowShape(input.in, flow3.out) 
        })
    }