Search code examples
scalaakka-stream

Akka Streams why partition is already connected when not using builder.add?


I'm trying out Akka Stream API and I have no idea why this throws java.lang.IllegalArgumentException: [Partition.in] is already connected in line 5

  val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val intSource = Source.fromIterator(() => Iterator.continually(Random.nextInt(10).toString))
  val validateInput: Flow[String, Message, NotUsed] = Flow[String].map(Message.fromString)
  val validationPartitioner = Partition[Message](2, { // #5 error here
    case _: Data => 0
    case _ => 1
  })

  val outputStream = Sink.foreach[Message](println(_))
  val errorStream = Sink.ignore

  intSource ~> validateInput ~> validationPartitioner.in 
      validationPartitioner.out(0) ~> outputStream
      validationPartitioner.out(1) ~> errorStream

  ClosedShape
})

but if I change validationPartitioner to be wrapped in builder.add(...) and remove .in from

intSource ~> validateInput ~> validationPartitioner.in 

Everything works. If I just remove .in the code doesn't compile. Why usage of builder is being forced and am I missing something or is it a bug?


Solution

  • All of the components of a graph must be added to the builder, but there are variants of the ~> operator that add the most commonly used components, such as Source and Flow, to the builder under the covers (see here and here). However, junction operations that perform a fan-in (such as Merge) or a fan-out (such as Partition) must be explicitly passed to builder.add if you're using the Graph DSL.