Search code examples
scalaakkaakka-stream

Is it possible to extract the substream key in akkastreams?


I can't seem to find any documentation on this but I know that AkkaStreams stores the keys used to group a stream into substreams when calling groupBy in memory. Is it possible to extract those keys from the substream? Say I create a bunch of substreams from my main stream, pass those through a fold that counts the objects in each substream and then store the count in a class. Can I get the key of the substream to also pass to that class? Or is there a better way of doing this? I need to count each element per substream but I also need to store which group the count belongs to.


Solution

  • A nice example is shown in the stream-cookbook:

    val counts: Source[(String, Int), NotUsed] = words
    // split the words into separate streams first
      .groupBy(MaximumDistinctWords, identity)
      //transform each element to pair with number of words in it
      .map(_ -> 1)
      // add counting logic to the streams
      .reduce((l, r) => (l._1, l._2 + r._2))
      // get a stream of word counts
      .mergeSubstreams
    

    Then the following:

    val words = Source(List("Hello", "world", "let's", "say", "again", "Hello", "world"))
    counts.runWith(Sink.foreach(println))
    

    Will print:

    (world,2)
    (Hello,2)
    (let's,1)
    (again,1)
    (say,1)
    

    Another example I thought of, was counting numbers by their remainders. So the following, as example:

    Source(0 to 101)
      .groupBy(10, x => x % 4)
      .map(e => e % 4 -> 1)
      .reduce((l, r) => (l._1, l._2 + r._2))
      .mergeSubstreams.to(Sink.foreach(println)).run()
    

    will print:

    (0,26)
    (1,26)
    (2,25)
    (3,25)