I explain my question through an example:
Let us assume we have a dataframe as follows:
original_df = sc.createDataFrame([('x', 10,), ('x', 15,), ('x', 10,), ('x', 25,), ('y', 20,), ('y', 10,), ('y', 20,)], ["key", "price"] )
original_df.show()
Output:
+---+-----+
|key|price|
+---+-----+
| x| 10|
| x| 15|
| x| 10|
| x| 25|
| y| 20|
| y| 10|
| y| 20|
+---+-----+
And assume I want to get a list of prices
for each key
using window
:
w = Window.partitionBy('key')
original_df.withColumn('price_list', F.collect_list('price').over(w)).show()
Output:
+---+-----+----------------+
|key|price| price_list|
+---+-----+----------------+
| x| 10|[10, 15, 10, 25]|
| x| 15|[10, 15, 10, 25]|
| x| 10|[10, 15, 10, 25]|
| x| 25|[10, 15, 10, 25]|
| y| 20| [20, 10, 20]|
| y| 10| [20, 10, 20]|
| y| 20| [20, 10, 20]|
+---+-----+----------------+
So far so good.
But if I want to get an ordered list, and I add orderBy
to my window w
I get:
w = Window.partitionBy('key').orderBy('price')
original_df.withColumn('ordered_list', F.collect_list('price').over(w)).show()
Output:
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 10| [10, 10]|
| x| 10| [10, 10]|
| x| 15| [10, 10, 15]|
| x| 25|[10, 10, 15, 25]|
| y| 10| [10]|
| y| 20| [10, 20, 20]|
| y| 20| [10, 20, 20]|
+---+-----+----------------+
Which means orderBy
(kind of) changed the rows (same as what rowsBetween
does) in the window as well! Which it's not supposed to do.
Eventhough I can fix it by specifying rowsBetween
in the window and get the expected results,
w = Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
can someone explain why orderBy
affects window
in that way?
Spark Window are specified using three parts: partition, order and frame.
specifically to your question, orderBy is not only to sort the partitioned data but it also change the row frame selection
Below are different windowspec and the corresponding output
Window.orderBy()
+---+-----+----------------------------+
|key|price|price_list |
+---+-----+----------------------------+
|x |15 |[15, 10, 10, 20, 10, 25, 20]|
|x |10 |[15, 10, 10, 20, 10, 25, 20]|
|y |10 |[15, 10, 10, 20, 10, 25, 20]|
|y |20 |[15, 10, 10, 20, 10, 25, 20]|
|x |10 |[15, 10, 10, 20, 10, 25, 20]|
|x |25 |[15, 10, 10, 20, 10, 25, 20]|
|y |20 |[15, 10, 10, 20, 10, 25, 20]|
+---+-----+----------------------------+
Window.partitionBy('key')
+---+-----+----------------+
|key|price| price_list|
+---+-----+----------------+
| x| 15|[15, 10, 10, 25]|
| x| 10|[15, 10, 10, 25]|
| x| 10|[15, 10, 10, 25]|
| x| 25|[15, 10, 10, 25]|
| y| 20| [20, 10, 20]|
| y| 10| [20, 10, 20]|
| y| 20| [20, 10, 20]|
+---+-----+----------------+
Window.partitionBy('key').orderBy('price')
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 10| [10, 10]|
| x| 10| [10, 10]|
| x| 15| [10, 10, 15]|
| x| 25|[10, 10, 15, 25]|
| y| 10| [10]|
| y| 20| [10, 20, 20]|
| y| 20| [10, 20, 20]|
+---+-----+----------------+
w = Window.partitionBy('key').orderBy(F.desc('price'))
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 25| [25]|
| x| 15| [25, 15]|
| x| 10|[25, 15, 10, 10]|
| x| 10|[25, 15, 10, 10]|
| y| 20| [20, 20]|
| y| 20| [20, 20]|
| y| 10| [20, 20, 10]|
+---+-----+----------------+
Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.currentRow)
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 10| [10]|
| x| 10| [10, 10]|
| x| 15| [10, 10, 15]|
| x| 25|[10, 10, 15, 25]|
| y| 10| [10]|
| y| 20| [10, 20]|
| y| 20| [10, 20, 20]|
+---+-----+----------------+
Window.partitionBy('key').rowsBetween(Window.unboundedPreceding, Window.currentRow)
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 15| [15]|
| x| 10| [15, 10]|
| x| 10| [15, 10, 10]|
| x| 25|[15, 10, 10, 25]|
| y| 10| [10]|
| y| 20| [10, 20]|
| y| 20| [10, 20, 20]|
+---+-----+----------------+