Search code examples
apache-sparkdataframepivot

Spark Scala sort PIVOT column


The following:

val pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.show()

I cannot recall seeing the ability to sort the pivoted column. What is the assumption of sorting? Ascending always. Cannot find it. Non-deterministic?

Tips welcome.


Solution

  • According to scala docs:

    There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally.

    Taking a look how the latter one works

    // This is to prevent unintended OOM errors when the number of distinct values is large
    val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues
    // Get the distinct values of the column and sort them so its consistent
    val values = df.select(pivotColumn)
      .distinct()
      .limit(maxValues + 1)
      .sort(pivotColumn)  // ensure that the output columns are in a consistent logical order
      .collect()
      .map(_.get(0))
      .toSeq
    

    and values is passed to the former version. So when using the version that auto-detects the values, the columns are always sorted using the natural ordering of values. If you need another sorting, it is easy enough to replicate the auto-detection mechanism and then call the version with explicit values:

    val df = Seq(("Foo", "UK", 1), ("Bar", "UK", 1), ("Foo", "FR", 1), ("Bar", "FR", 1))
      .toDF("Product", "Country", "Amount")
    df.groupBy("Product")
      .pivot("Country", Seq("UK", "FR")) // natural ordering would be "FR", "UK"
      .sum("Amount")
      .show()
    

    Output:

    +-------+---+---+
    |Product| UK| FR|
    +-------+---+---+
    |    Bar|  1|  1|
    |    Foo|  1|  1|
    +-------+---+---+