I have concrete subclass of GraphStage
that defines some custom logic that is influenced by the class parameters.
I would like users of my application to be able to supply a Seq
of these custom GraphStages
. When building the RunnableGraph
I would like to add edges between the Source
and the first stage in the Seq
, then between each stage in order, and finally the Sink
. In other words: src ~> stages.reduce(_ ~> _) ~> sink
Unfortunately this doesn't compile. I think the reason might be related to operator precedence. I tried being more explicit using .via
or .foldLeft
but I couldn't quite get it right.
This feels like this kind of thing should have a fairly straightforward syntax. Am I missing an operator in the docs? Is this kind of dynamic graph not possible for some reason?
Below is a fabricated example of this pattern using simple stages of String => String
. It includes my incompilable code that logically represents the graph I want to express.
import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream._
import scala.concurrent.Future
case class MyStage[T](/* ... params ... */) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("MyStage.in")
val out = Outlet[T]("MyStage.out")
val shape: FlowShape[T, T] = FlowShape.of(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ??? // Depends on params
}
case class MyApp(stages: Seq[MyStage[String]]) {
val out = Sink.seq[String]
val graph = RunnableGraph.fromGraph(GraphDSL.create(out) { implicit b: GraphDSL.Builder[Future[Seq[String]]] =>
sink =>
import GraphDSL.Implicits._
val src: Source[String, NotUsed] = Source(Seq("abc", "hello world", "goodbye!"))
// This is what I logically want to do.
src ~> stages.reduce(_ ~> _) ~> sink
ClosedShape
}
}
You can create flow of your stages like this:
val graph = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val stagesShapes = stages.map(b.add(_))
stagesShapes.reduce { (s1, s2) =>
s1 ~> s2
FlowShape(s1.in, s2.out)
}
}
Then all you need is connect source and sink to this flow and run it.