I'm attempting to apply various functions in stream such as mean, volatility. From a list I create sub lists using grouped :
Below code calculates the sum in stream :
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
object CalculateSumInStream extends App {
implicit val actorSystem = ActorSystem()
case class Person(name: String, age: Double)
val personSource = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.grouped(10)
.mapConcat(identity)
.reduce((person1, person2) => {
Person(person1.name, person1.age + person2.age)
})
.mergeSubstreams
.runForeach(println)
}
produces :
Person(2,2.0)
Person(1,100.0)
Can 'non stream' functions be used calculate sub stream calculations, for example mean.
For example, to implement mean in native Scala on same List
of Person
can use :
object CalculateStats extends App{
case class Person(name: String, age: Double)
val personSource = List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2))
def calculateMean(personList: List[Person]): (String , Double) = {
val values = personList.map(m => m.age)
(personList(0).name , values.sum / values.size)
}
personSource
.groupBy(g => g.name)
.map(m => calculateMean(m._2))
.foreach(println)
}
But is this kind of 'pattern' possible in Scala Akka Streams ? By pattern I mean applying a function in stream such as
calculateMean
defined above.
So, instead of .reduce((person1, person2) => {
invoking the function calculateMean
defined above and producing the output :
(1,25.0)
(2,2.0)
Where 1 and 2 are the Person name and 25.0 and 2.0 are the mean age for each person.
If you do not care about the memory this takes, you could just fold
the values into a list and then map that list through your function:
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.fold(List.empty[Person])((list, person) => person :: list)
.map(calculateMean)
.mergeSubstreams
.runForeach(println)
Alternatively, you could introduce some intermediate helper data type:
case class PersonAvg(name: String, total: Double, count: Int) {
def avgAge: Double = total / count
def +(that: PersonAvg) = {
copy(total = total + that.total, count = count + that.count)
}
}
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.map(p => PersonAvg(p.name, p.age, 1))
.reduce(_ + _)
.map(p => p.name -> p.avgAge)
.mergeSubstreams
.runForeach(println)
P.S.: you could of course also use an aonymous Tuple3
instead of introducing a dedicated case class, but imho, a dedicated case class with clear names make this much more readable and is worth the little bit of extra code.