Search code examples
apache-sparkapache-spark-sqlwindow-functionsrankapproximation

Efficiently calculate top-k elements in spark


I have a dataframe similarly to:

+---+-----+-----+
|key|thing|value|
+---+-----+-----+
| u1|  foo|    1|
| u1|  foo|    2|
| u1|  bar|   10|
| u2|  foo|   10|
| u2|  foo|    2|
| u2|  bar|   10|
+---+-----+-----+

And want to get a result of:

+---+-----+---------+----+
|key|thing|sum_value|rank|
+---+-----+---------+----+
| u1|  bar|       10|   1|
| u1|  foo|        3|   2|
| u2|  foo|       12|   1|
| u2|  bar|       10|   2|
+---+-----+---------+----+

Currently, there is code similarly to:

val df = Seq(("u1", "foo", 1), ("u1", "foo", 2), ("u1", "bar", 10), ("u2", "foo", 10), ("u2", "foo", 2), ("u2", "bar", 10)).toDF("key", "thing", "value")

 // calculate sums per key and thing
 val aggregated = df.groupBy("key", "thing").agg(sum("value").alias("sum_value"))

 // get topk items per key
 val k = lit(10)
 val topk = aggregated.withColumn("rank", rank over  Window.partitionBy("key").orderBy(desc("sum_value"))).filter('rank < k)

However, this code is very inefficient. A window function generates a total order of items and causes a gigantic shuffle.

How can I calculate top-k items more efficiently? Maybe using approximate functions i.e. sketches similarly to https://datasketches.github.io/ or https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html


Solution

  • This is a classical algorithm of recommender systems.

    case class Rating(thing: String, value: Int) extends Ordered[Rating] {
      def compare(that: Rating): Int = -this.value.compare(that.value)
    }
    
    case class Recommendation(key: Int, ratings: Seq[Rating]) {
      def keep(n: Int) = this.copy(ratings = ratings.sorted.take(n))
    }
    
    val TOPK = 10
    
    df.groupBy('key)
      .agg(collect_list(struct('thing, 'value)) as "ratings")
      .as[Recommendation]
      .map(_.keep(TOPK))
    

    You can also check the source code at:

    • Spotify Big Data Rosetta Code / TopItemsPerUser.scala, several solutions here for Spark or Scio
    • Spark MLLib / TopByKeyAggregator.scala, considered the best practice when using their recommendation algorithm, it looks like their examples still uses RDD though.
    import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
    
    sc.parallelize(Array(("u1", ("foo", 1)), ("u1", ("foo", 2)), ("u1", ("bar", 10)), ("u2", ("foo", 10)),
      ("u2", ("foo", 2)), ("u2", ("bar", 10))))
      .topByKey(10)(Ordering.by(_._2))