It looks like median
over a window function
is not supported, but somehow error message is not explicitly saying that. Is there another way to calculate median over rolling window?
import pyspark # 3.4.1
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(4, 5),
(3, 10),
(2, 15),
(1, 20)
],
('id1', 'v1')
)
df.createOrReplaceTempView("x")
spark.sql('select median(v1) over (order by id1 rows between 2 preceding and current row) as v1 from x').collect()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/jan/.local/lib/python3.8/site-packages/pyspark/sql/session.py", line 1440, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
File "/home/jan/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/jan/.local/lib/python3.8/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: Cannot specify order by or frame for 'median'.; line 1 pos 7;
Project [v1#4]
+- Project [v1#1L, id1#0L, v1#4, v1#4]
+- Window [median(v1#1L) windowspecdefinition(id1#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS v1#4], [id1#0L ASC NULLS FIRST]
+- Project [v1#1L, id1#0L]
+- SubqueryAlias x
+- View (`x`, [id1#0L,v1#1L])
+- LogicalRDD [id1#0L, v1#1L], false
Removing order by id1
, which is obviously not desirable, does not make any difference.
I was having the same issue, BUT, since Spark 3.5.0, percentile
function is available, which returns the exact wanted percentile (therefore, 0.5 for median value) (doc ref: pyspark.sql.functions.percentile)
Apply to your case:
df = spark.createDataFrame(
[
(4, 5),
(3, 10),
(2, 15),
(1, 20)
],
('id1', 'v1')
)
df.createOrReplaceTempView("x")
display(spark.sql('''
select
x.*,
percentile(v1, 0.5) over (
order by id1
rows between 2 preceding and current row
) as median
from x
order by id1
'''))
Plain PySpark version :
window = Window \
.orderBy('id1') \
.rangeBetween(-2, Window.currentRow)
display(
df.withColumn('median', percentile('v1', 0.5).over(window))
)
Both return the same result:
id1 | v1 | median |
---|---|---|
1 | 20 | 20 |
2 | 15 | 17.5 |
3 | 10 | 15 |
4 | 5 | 10 |