I have N
(this may be different on each materialization) finite sorted sources of numbers. I need the result as a stream of these numbers with the count of how many times they appear.
For example:
1,3,5,7 -> | |
1,5,7 -> | ? | -> (1,2),(2,1),(3,1),(4,1),(5,3),(7,2)
2,4,5 -> | |
How can this be implemented?
Basically you combine several sources into one and then aggregate data. Here is a helper object and method which I use to combine several sources:
object ConcatSources {
def apply[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] = {
sources match {
case first :: second :: rest =>
Source.combine(first, second, rest: _*)(Concat(_))
case first :: _ =>
first
case Nil =>
Source.empty
}
}
}
Then solution to your task:
val sources: Seq[Source[Int, NotUsed]] = Seq(
Source[Int](List(1, 3, 5, 7)),
Source[Int](List(1, 5, 7)),
Source[Int](List(2, 4, 5))
)
ConcatSources(sources).fold(Map[Int, Int]()) { (map, i) =>
map.updated(i, map.getOrElse(i, 0) + 1)
}.runForeach(map => println(map.toList))