Search code examples
scalaakkaakka-stream

How to reduce `.via()` down a Seq of custom GraphStage?


I have concrete subclass of GraphStage that defines some custom logic that is influenced by the class parameters.

I would like users of my application to be able to supply a Seq of these custom GraphStages. When building the RunnableGraph I would like to add edges between the Source and the first stage in the Seq, then between each stage in order, and finally the Sink. In other words: src ~> stages.reduce(_ ~> _) ~> sink

Unfortunately this doesn't compile. I think the reason might be related to operator precedence. I tried being more explicit using .via or .foldLeft but I couldn't quite get it right.

This feels like this kind of thing should have a fairly straightforward syntax. Am I missing an operator in the docs? Is this kind of dynamic graph not possible for some reason?

Below is a fabricated example of this pattern using simple stages of String => String. It includes my incompilable code that logically represents the graph I want to express.

import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream._

import scala.concurrent.Future

case class MyStage[T](/* ... params ... */) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("MyStage.in")
  val out = Outlet[T]("MyStage.out")
  val shape: FlowShape[T, T] = FlowShape.of(in, out)
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ??? // Depends on params
}


case class MyApp(stages: Seq[MyStage[String]]) {
  val out = Sink.seq[String]

  val graph = RunnableGraph.fromGraph(GraphDSL.create(out) { implicit b: GraphDSL.Builder[Future[Seq[String]]] =>
    sink =>
      import GraphDSL.Implicits._
      
      val src: Source[String, NotUsed] = Source(Seq("abc", "hello world", "goodbye!"))

      // This is what I logically want to do.
      src ~> stages.reduce(_ ~> _) ~> sink

      ClosedShape
  }
}

Solution

  • You can create flow of your stages like this:

         val graph = GraphDSL.create() { implicit b =>
              import GraphDSL.Implicits._
    
              val stagesShapes = stages.map(b.add(_))
    
              stagesShapes.reduce { (s1, s2) => 
                s1 ~> s2
                FlowShape(s1.in, s2.out)
              }
          }
    

    Then all you need is connect source and sink to this flow and run it.