apache-sparkpysparkapache-spark-sql

PySpark aggregate (min/max) function behaviour depends on window orderBy?


I am running PySpark 3.3.1.

In my Window function, I noticed that the max output is not what I expect. Why is that? What is happening in the window group?

I have a window function defined as follows

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame(pd.DataFrame(
    {
        "id": ["A-123","A-123","A-123","A-123","B-123","B-123","B-123","B-123"], 
        "val": [0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8]
    }
))

window_group = Window.partitionBy(F.col('id')).orderBy(F.col('val'))

(
    df
    .withColumn('min_val', F.min(F.col('val')).over(window_group))
    .withColumn('max_val', F.max(F.col('val')).over(window_group))
).show()

Gives the following output:

+-----+---+-------+-------+
|   id|val|min_val|max_val|
+-----+---+-------+-------+
|A-123|0.1|    0.1|    0.1|
|A-123|0.2|    0.1|    0.2|
|A-123|0.3|    0.1|    0.3|
|A-123|0.4|    0.1|    0.4|
|B-123|0.5|    0.5|    0.5| --> should be 0.8?
|B-123|0.6|    0.5|    0.6| --> should be 0.8?
|B-123|0.7|    0.5|    0.7| --> should be 0.8?
|B-123|0.8|    0.5|    0.8|
+-----+---+-------+-------+

To fix the issue I can:

  1. Add rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) OR
  2. Remove the orderBy clause.

What is happening here?


Solution

  • Duplicate question here: Get the max value over the window in pyspark

    From the pyspark docs:

    When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.