Search code examples
scalaakkaakka-streamreactive-streams

Akka streams reduce to smaller stream


I have an ordered stream of data

A A A B B C C C C ... (very long)

And I want to transform it to a stream of aggregates in the form (item, count):

(A, 3) (B, 2) (C, 4)

What operators could I use in Akka Streams for this?

Source.fromPublisher(publisher)
    .aggregateSomehow()  // ?
    .runWith(sink)

I've looked into .groupBy but it requires that I know the number of categories in advance which I don't. Also I believe it will keep all groups in memory which I'd like to avoid. I should be able to discard (A, 3) after it has been processed and free up resources it consumes.

Edit: This question ask for similar functionality but using SubFlows. However using SubFlows doesn't seem to be required because I have a solution using the statefulMapConcat combinator.


Solution

  • One option is to use the statefulMapConcat combinator:

    Source(List("A", "A", "B", "B", "B", "C", "C", ""))
          .statefulMapConcat({ () =>
            var lastChar = ""
            var count = 0
    
            char => if(lastChar == char) {
                count += 1
                List.empty
              } else {
                val charCount = (lastChar, count)
                lastChar = char
                count = 1
                List(charCount)
              }
          })
        .runForeach(println)
    

    However that required appending an element to the input stream to mark the end.

    Output:

    (,0)
    (A,2)
    (B,3)
    (C,2)
    

    Thanks to @chunjef for the suggestion in comments