Search code examples
dataframescalaapache-sparkapache-spark-sql

How to sort array of struct type in Spark DataFrame by particular field?


Given following code:

import java.sql.Date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SortQuestion extends App{

  val spark = SparkSession.builder().appName("local").master("local[*]").getOrCreate()
  import spark.implicits._
  case class ABC(a: Int, b: Int, c: Int)

  val first = Seq(
    ABC(1, 2, 3),
    ABC(1, 3, 4),
    ABC(2, 4, 5),
    ABC(2, 5, 6)
  ).toDF("a", "b", "c")

  val second = Seq(
    (1, 2, (Date.valueOf("2018-01-02"), 30)),
    (1, 3, (Date.valueOf("2018-01-01"), 20)),
    (2, 4, (Date.valueOf("2018-01-02"), 50)),
    (2, 5, (Date.valueOf("2018-01-01"), 60))
  ).toDF("a", "b", "c")

  first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b")).groupBy("a").agg(sort_array(collect_list("c2")))
    .show(false)

}

Spark produces following result:

+---+----------------------------------+
|a  |sort_array(collect_list(c2), true)|
+---+----------------------------------+
|1  |[[2018-01-01,20], [2018-01-02,30]]|
|2  |[[2018-01-01,60], [2018-01-02,50]]|
+---+----------------------------------+

This implies that Spark is sorting an array by date (since it is the first field), but I want to instruct Spark to sort by specific field from that nested struct.

I know I can reshape array to (value, date) but it seems inconvenient, I want a general solution (imagine I have a big nested struct, 5 layers deep, and I want to sort that structure by particular column).

Is there a way to do that? Am I missing something?


Solution

  • If you have complex object it is much better to use statically typed Dataset.

    case class Result(a: Int, b: Int, c: Int, c2: (java.sql.Date, Int))
    
    val joined = first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b"))
    joined.as[Result]
      .groupByKey(_.a)
      .mapGroups((key, xs) => (key, xs.map(_.c2).toSeq.sortBy(_._2)))
      .show(false)
    
    // +---+----------------------------------+            
    // |_1 |_2                                |
    // +---+----------------------------------+
    // |1  |[[2018-01-01,20], [2018-01-02,30]]|
    // |2  |[[2018-01-02,50], [2018-01-01,60]]|
    // +---+----------------------------------+
    

    In simple cases it is also possible to udf, but leads to inefficient and fragile code in general and quickly goes out of control, when complexity of objects grows.