I have a DataFrame with columns "id", "Month", "Day", "Hour" and "value" I want to group/partition by id, Month and Hour and perform a median on the value column so that I have a daily median per hour for each month. I have a UDF that can calculate the median:
def medianCalculator(seq: Seq[Int]): Int = {
val sortedSeq = seq.sortWith(_ < _)
if (seq.size % 2 == 1) sortedSeq(sortedSeq.size / 2)
else {
val (up, down) = sortedSeq.splitAt(seq.size / 2)
(up.last + down.head) / 2
}
}
Taken from here
I can apply this to the whole column and get a result using:
val output = df.select("value").collect().flatMap(_.toSeq).toSeq.map(_.toString.toInt)
However, I am unable to determine a method that partitions by "id", "Month" and "Hour" and returns the median of "value" once grouped/partitioned in a new column.
EDIT:
Added "Day" column above and comment about daily median per hour for each month.
There isn't a lot to go on in your question, but here are a couple of lines to point you in the right direction.
val medianUDF = udf(medianCalculator)
val output = df
.groupBy($"id", $"Month", $"Hour")
.agg(collect_list($"value").as("values"))
.select($"id", $"Month", $"Hour", medianUDF($"values"))