Search code examples
scalaapache-sparkuser-defined-functionsmedian

Scala: Median UDF on Data Frame Column Partitioned by adjacent Columns


I have a DataFrame with columns "id", "Month", "Day", "Hour" and "value" I want to group/partition by id, Month and Hour and perform a median on the value column so that I have a daily median per hour for each month. I have a UDF that can calculate the median:

def medianCalculator(seq: Seq[Int]): Int = {
  val sortedSeq = seq.sortWith(_ < _)

  if (seq.size % 2 == 1) sortedSeq(sortedSeq.size / 2)
  else {
    val (up, down) = sortedSeq.splitAt(seq.size / 2)
    (up.last + down.head) / 2
  }
}

Taken from here

I can apply this to the whole column and get a result using:

val output = df.select("value").collect().flatMap(_.toSeq).toSeq.map(_.toString.toInt)

However, I am unable to determine a method that partitions by "id", "Month" and "Hour" and returns the median of "value" once grouped/partitioned in a new column.

EDIT:

Added "Day" column above and comment about daily median per hour for each month.


Solution

  • There isn't a lot to go on in your question, but here are a couple of lines to point you in the right direction.

    val medianUDF = udf(medianCalculator)
    
    val output = df
      .groupBy($"id", $"Month", $"Hour")
      .agg(collect_list($"value").as("values"))
      .select($"id", $"Month", $"Hour", medianUDF($"values"))