Search code examples
scalaakkaakka-stream

GraphStage with shape of 2-in 2-out


I need to write a custom GraphStage that has two input ports, and two output ports. This GraphStage will allow two otherwise independent flows to affect each other. What shape could I use for that? FanOutShape2 Has two outputs and FanInShape2 has two inputs, but how can I have a shape that has both? Somehow combine (inherit from) both? Use BidiFlow? Make my own?


Solution

  • Answering this myself, since this has been solved by the helpful guys on discuss.lightbend.com, see https://discuss.lightbend.com/t/graphstage-with-shape-of-2-in-and-2-out/4160/3

    The answer to this question is to simply use BidiShape. Despite the otherwise revealing name, the logic behind a BidiShape has to be by no means bi-directional (it's obvious in retrospect, but I was thrown off by this).

    Some code that can be used for reference if anybody is in a similar situation, where they have to do something based on two inputs, with the possibility to push to two outputs:

    class BiNoneCounter[T]() extends GraphStage[BidiShape[Option[T], Option[Int], Option[T], Option[Int]]] {
      private val leftIn = Inlet[Option[T]]("BiNoneCounter.in1")
      private val rightIn = Inlet[Option[T]]("BiNoneCounter.in2")
      private val leftOut = Outlet[Option[Int]]("BiNoneCounter.out1")
      private val rightOut = Outlet[Option[Int]]("BiNoneCounter.out2")
      override val shape = BidiShape(leftIn, leftOut, rightIn, rightOut)
    
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
        private var grabNextPush = false
    
        val inHandler = new InHandler {
          override def onPush(): Unit = {
            if (grabNextPush) {
              (grab(leftIn), grab(rightIn)) match {
                // do stuff here
              }
            }
            grabNextPush = !grabNextPush
          }
        }
    
        val outHandler = (inlet: Inlet[Option[T]]) => new OutHandler {
          override def onPull(): Unit = {
            pull(inlet)
          }
        }
    
        setHandler(leftOut, outHandler(leftIn))
        setHandler(rightOut, outHandler(rightIn))
        setHandler(leftIn, inHandler)
        setHandler(rightIn, inHandler)
      }
    }
    

    Can be used like this:

            sourceOne ~> bidi.in1
                         bidi.out1 ~> sinkOne
            sourceTwo ~> bidi.in2
                         bidi.out2 ~> sinkTwo