Search code examples
apache-sparkpysparkapache-spark-sqlbinning

Performing aggregation after binning a dataframe to a specific size


I have a pyspark dataframe like so: (in this example I have 20 records)

+-----------------------+---------+
|TIME_STAMP             |RESULT   |
+-----------------------+---------+
|2020-08-31 00:00:08.395|80.0     |
|2020-08-31 00:03:50.422|27939.368|
|2020-08-31 00:04:27.586|80.0     |
|2020-08-31 00:06:01.476|27956.04 |
|2020-08-31 00:06:12.883|27958.179|
|2020-08-31 00:06:14.082|27939.168|
|2020-08-31 00:08:46.169|80.0     |
|2020-08-31 00:11:18.627|27940.127|
|2020-08-31 00:13:04.91 |80.0     |
|2020-08-31 00:13:18.746|27954.786|
|2020-08-31 00:13:38.569|27958.417|
|2020-08-31 00:13:51.633|27939.395|
|2020-08-31 00:17:23.901|80.0     |
|2020-08-31 00:18:47.043|27940.273|
|2020-08-31 00:20:36.029|27956.06 |
|2020-08-31 00:21:03.403|27958.464|
|2020-08-31 00:21:19.796|27939.9  |
|2020-08-31 00:21:42.546|80.0     |
|2020-08-31 00:26:01.334|80.0     |
|2020-08-31 00:27:53.582|27955.768|
+-----------------------+---------+

I have sorted it by TIME_STAMP and would like to bin the dataframe in groups of 5. and perform aggregations (mean) on the RESULT column for each of the groups. So the first 5 records would make a group, the next 5 and so on would lead to 4 groups.

Expected output:

bin     mean
5   16802.7174
10  16798.8162
15  22374.829
20  16802.8264

Here, the bin column is from records 1-5 and the mean column is the average of those 5 records and so on and so forth.

In my research, it seems that I might have to use the monotonically_increasing_id() pyspark function, which I am trying to avoid since I have very large datasets and might result in OOM.

Is there a way to achieve this without having to collect the entire dataset to the driver?

As an additional side question, in the above example, the total number of records (20) is divisible by 5. But say I have 19 records, is there a way to have 3 groups of 5 records and 4 records in the last group?


Solution

    1. First assign a row number to each row by using the row_number() over (order by timestamp). No partitioning required.
    2. Next, bin the row number by taking the floor((row_number - 1)/5).
    3. Finally it becomes a trivial group by

    Example SQL you can run as-is and easily adapt to your data:

    SELECT floor((id - 1)/5), avg(value)
    FROM   (SELECT row_number() OVER (ORDER BY value) AS id,
                   value
            FROM   (SELECT Explode(Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210)) AS value) a)
    GROUP  BY 1