Search code examples
scalaakkaakka-stream

Using Akka GraphDSL with Zip stages


Consider the following code:

GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val in = Source(0 to 10)
  val fanOut = builder.add(Broadcast[Int](2))
  val toString = builder.add(Flow[Int].map(_.toString))
  val squared = builder.add(Flow[Int].map(x => x * x))
  val zip = builder.add(ZipLatestWith((str: String, sqr: Int) => (str, sqr)))
  val out = Sink.ignore

  in ~> fanOut ~> toString ~> zip ~> out
        fanOut ~> squared  ~> zip

  ClosedShape
}

I get an error in zip ~> out stating that "overloaded method ~> can't be applied to FanInShape2[String, Int, (String, Int)]". I can, of course, rewrite that graph composition in the following matter:

in ~> fanOut ~> toString ~> zip.in0; zip.out ~> out
      fanOut ~> squared  ~> zip.in1

But I have seen tutorials that show branching defined without the .in and .out specifics. Is there a limitation in the DSL regarding Zip stages, or am I doing something wrong?


Solution

  • It's a limitation of the DSL regarding zip stages relative to the merge/concat stages. Specifically, with the zip stages, because it's possible for the inlets to be different types, the approach taken with merge/concat where the first inlet to be attached is effectively in0 won't preserve type-safety (there's no way in Scala to express "the first usage of in takes a Foo and the second takes a Bar). While it would be possible to define a Zip operator that zipped two of the same type, there's also a case to be made that being explicit about which inlet maps to which half of the pair is valuable for preventing a reordering of the order in which the graph is constructed from affecting the meaning of the elements in the stream (e.g. if zipping (x, y) coordinates, an unexpected change of order would be a diagonal reflection).