Search code examples
scalaakkaakka-streamalpakka

Akka Streams: How to construct a Source of Sources with GraphDSL?


Here's a simple scenario.

Let's begin with a single Akka Source: let's say, of rows retrieved from a database. Based on a partitioning function, different rows need to be diverted into different streams. So, we use a fan-out operator, for which we need to use GraphDSL (can't just use just simple functions such as .via and .to for this).

So, in the builder block of GraphDSL.create() { builder => [block]}, we create a Partition of the UniformFanOutShape and connect the original Source's outlet with the Partition's inlet. The Partition's outlets, for the time being, are hanging out in the breeze.

Next, we should format the rows in each separate stream as CSV. Alpakka has a CSV module, and we can use the akka.stream.alpakka.csv.scaladsl.CsvFormatting.format([arguments]) Flow for that purpose. So, we .add some CSV-formatting Flows to the builder (one per each fanned-out stream), and connect each Partition's outlet to its corresponding Flow's inlet. The Flows' outlets are hanging out in the breeze.

Next, we should attach filenames to each CSV-formatted stream and zip them together. Alpakka has an Archive module, which can zip from multiple Sources. The problem is that akka.stream.alpakka.file.scaladsl.Archive.zip() returns a Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed]. It's a Flow of Sources (let's ignore the AchiveMetadata part, which corresponds to the filenames), for which we need a Source of Sources to pipe data from--or at least, a collection of Sources (which we can pass to Source.apply).

The graph we have been building so far has a collection of outlets hanging in the breeze. But we can only produce a single graph from that builder block, right? We can wrap it up as a Graph of AmorphousShape with no inlets and N outlets. The question is how to transform it into a collection of Sources, one per each CSV-formatting outlet described above.

Or is there another way entirely? Please suggest.

P.S. Looking at the Akka source code, we have:

  /**
   * Creates a new [[Graph]] by passing a [[GraphDSL.Builder]] to the given create function.
   */
  def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed] = {
    val builder = new GraphDSL.Builder
    val s = buildBlock(builder)

    createGraph(s, builder)
  }

What I'd love to have is something like this:

  def createMultiple[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] => Seq[S]): Graph[S, NotUsed] = {
    val builder = new GraphDSL.Builder
    val shapes = buildBlock(builder)

    shapes.map(createGraph(_, builder))
  }

Unfortunately, new GraphDSL.Builder is private to the package. The createGraph that takes a Shape is private, too. Oops.

The other problem with that is: if we do bypass the private keyword, and if there is more than one fanning-out stream (everything works fine with only one), then for each SourceShape in the constructed collection, not all Partition outlets appear to be connected (because the others are connected through the other sources), resulting in an error. I suspect traversals are the cause.


Solution

  • You could attach Sink.asPublishers to the loose outlets in the graph: each Sink.publisher will materialize as a Reactive Streams publisher (which is the Reactive Streams equivalent to an Akka Source). Those publishers can then be converted into Sources using Source.fromPublisher.

    Using the GraphDSL.create overload which takes a sequence of graphs to inject would materialize as a sequence of the materialized values of the injected graphs. Since every outlet of this graph is capped by a sink, it itself has a SinkShape. Attach your source to that sink and run() it to get a sequence of publishers. From there it's just a matter of Source(publishers).map(Source.fromPublisher).via(Archive.zip()).