Search code examples
apache-sparkpysparkwindow-functions

median over window function is not supported?


It looks like median over a window function is not supported, but somehow error message is not explicitly saying that. Is there another way to calculate median over rolling window?

import pyspark # 3.4.1
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        (4, 5),
        (3, 10),
        (2, 15),
        (1, 20)
    ],
    ('id1', 'v1')
)
df.createOrReplaceTempView("x")
spark.sql('select median(v1) over (order by id1 rows between 2 preceding and current row) as v1 from x').collect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/jan/.local/lib/python3.8/site-packages/pyspark/sql/session.py", line 1440, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
  File "/home/jan/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/home/jan/.local/lib/python3.8/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: Cannot specify order by or frame for 'median'.; line 1 pos 7;
Project [v1#4]
+- Project [v1#1L, id1#0L, v1#4, v1#4]
   +- Window [median(v1#1L) windowspecdefinition(id1#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS v1#4], [id1#0L ASC NULLS FIRST]
      +- Project [v1#1L, id1#0L]
         +- SubqueryAlias x
            +- View (`x`, [id1#0L,v1#1L])
               +- LogicalRDD [id1#0L, v1#1L], false

Removing order by id1, which is obviously not desirable, does not make any difference.


Solution

  • I was having the same issue, BUT, since Spark 3.5.0, percentile function is available, which returns the exact wanted percentile (therefore, 0.5 for median value) (doc ref: pyspark.sql.functions.percentile)

    Apply to your case:

    df = spark.createDataFrame(
        [
            (4, 5),
            (3, 10),
            (2, 15),
            (1, 20)
        ],
        ('id1', 'v1')
    )
    df.createOrReplaceTempView("x")
    
    display(spark.sql('''
        select 
            x.*,
            percentile(v1, 0.5) over (
                order by id1 
                rows between 2 preceding and current row
            ) as median 
        from x
        order by id1
    '''))
    

    Plain PySpark version :

    window = Window \
        .orderBy('id1') \
        .rangeBetween(-2, Window.currentRow)
    
    display(
        df.withColumn('median', percentile('v1', 0.5).over(window))
    )
    

    Both return the same result:

    id1 v1 median
    1 20 20
    2 15 17.5
    3 10 15
    4 5 10