Search code examples
pythondataframeapache-sparkpysparkrdd

avg() over a whole dataframe causing different output


I see that dataframe.agg(avg(Col) works fine, but when i calculate avg() over a window over whole column(not using any partition), i see different results based on which column i use with orderBy.

Sample code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sample_for_SE").getOrCreate()

# Sample data
data = [
    (1, 10.0, 5.0),
    (3, 20.0, None),
    (5, 15.0, None)
]

schema = ["id", "value1", "value2"]
df = spark.createDataFrame(data, schema=schema)

# Display DataFrame and avg()
df.show()
df.agg(avg("value1")).show()

And the output showing DF and avg correctly:

+---+------+------+
| id|value1|value2|
+---+------+------+
|  1|  10.0|   5.0|
|  3|  20.0|  NULL|
|  5|  15.0|  NULL|
+---+------+------+

+-----------+
|avg(value1)|
+-----------+
|       15.0|
+-----------+

However with window function:

from pyspark.sql.window import Window

#with orderBy("value1")
#========================
w = Window.orderBy("value1")
df.withColumn("AVG",avg(col("value1")).over(w))\
.sort("id",ascending=True)\
.show()


#with orderBy("id")
#========================
w = Window.orderBy("id")
df.withColumn("AVG",avg(col("value1")).over(w))\
.sort("id",ascending=True)\
.show()

Output:

| id|value1|value2| AVG|
+---+------+------+----+
|  1|  10.0|   5.0|10.0|
|  3|  20.0|  NULL|15.0|
|  5|  15.0|  NULL|12.5|
+---+------+------+----+

+---+------+------+----+
| id|value1|value2| AVG|
+---+------+------+----+
|  1|  10.0|   5.0|10.0|
|  3|  20.0|  NULL|15.0|
|  5|  15.0|  NULL|15.0|
+---+------+------+----+

Question:

  1. Why would it matter which column i choose in the orderBy(), as i am choosing whole column anyways for calculating avg()?
  2. Why is the avg() not being shown consistently as a fixed number, rather its being shown as 10, 15, 12.5 etc.

Solution

  • It's something I obtained in my experiments too and understood after quite some time. I later found a reference link where this thing is explained but I cannot find it anymore.

    Anyway, the reason this happens is the following: whenever we use a Window function in Spark and decide to order it with .orderBy(), there is an optional argument .rangeBetween that is hiddenly set by default to (Window.unboundedPreceding, Window.currentRow), which means that the moving average for each row is taken by averaging all the values between that current row and the first row of the dataframe.

    Example:

    • moving average for row 1 is: average of rows [1]
    • moving average for row 2 is: average of rows [1,2]
    • moving average for row 3 is: average of rows [1,2,3]
    • etc.

    Solution

    To overcome this "issue/feature", you just need to specify the WindowSpec you want in the definition of the window, i.e.:

    w = Window.orderBy("value1").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    

    but in such case, I suggest you to use the other method df.agg(avg("value1")) that was working, since there is no reason to take the average of the entire dataframe with a Window function.