Search code examples
scalaakka-stream

How to keep attribute after reduce?


Using below code I'm attempting to output the name and the sum of the ages for each person :

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}

object CalculateMeanInStream extends App {

  implicit val actorSystem = ActorSystem()

  case class Person(name: String, age: Double)
  val personSource = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))
  val meanPrintSink = Sink.foreach[Double](println)
  val printSink = Sink.foreach[Double](println)

  def calculateMean(values: List[Double]): Double = {
    values.sum / values.size
  }

  personSource.groupBy(maxSubstreams = 2 , s  => s.name)
    .map(m => m.age)
    .reduce(_ + _ )
    .mergeSubstreams
    .runForeach(println)

}

The output is :

2.0
100.0

Is there a way to keep the persons name as part of the reduce so that the following is produced in the output :

(2.0 , 2)
(100.0 , 1)

I've tried :

  personSource.groupBy(maxSubstreams = 2 , s  => s.name)
    .reduce((x , y) => x.age + y.age)
    .mergeSubstreams
    .runForeach(println)

but throws compiler error :

type mismatch;
 found   : Double
 required: CalculateMeanInStream.Person
    .reduce((x , y) => x.age + y.age)

Solution

  • personSource
        .groupBy(maxSubstreams = 2, s => s.name)
        .reduce((person1, person2) => Person(person1.name, person1.age + person2.age))
        .mergeSubstreams
        .runForeach(println)