Search code examples
scalaakka-stream

Akka Streams: how to count distinct values in different sources


I have N (this may be different on each materialization) finite sorted sources of numbers. I need the result as a stream of these numbers with the count of how many times they appear.

For example:

1,3,5,7 -> |   |
1,5,7   -> | ? | -> (1,2),(2,1),(3,1),(4,1),(5,3),(7,2)
2,4,5   -> |   |

How can this be implemented?


Solution

  • Basically you combine several sources into one and then aggregate data. Here is a helper object and method which I use to combine several sources:

    object ConcatSources {
      def apply[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] = {
    
        sources match {
          case first :: second :: rest =>
            Source.combine(first, second, rest: _*)(Concat(_))
          case first :: _ =>
            first
          case Nil =>
            Source.empty
        }
      }
    }
    

    Then solution to your task:

      val sources: Seq[Source[Int, NotUsed]] = Seq(
        Source[Int](List(1, 3, 5, 7)),
        Source[Int](List(1, 5, 7)),
        Source[Int](List(2, 4, 5))
        )
      ConcatSources(sources).fold(Map[Int, Int]()) { (map, i) =>
        map.updated(i, map.getOrElse(i, 0) + 1)
      }.runForeach(map => println(map.toList))