Search code examples
pysparkwindowsql-order-by

How orderBy affects Window.partitionBy in Pyspark dataframe?


I explain my question through an example:
Let us assume we have a dataframe as follows:

original_df = sc.createDataFrame([('x', 10,), ('x', 15,), ('x', 10,), ('x', 25,), ('y', 20,), ('y', 10,), ('y', 20,)], ["key", "price"] )
original_df.show()

Output:

+---+-----+
|key|price|
+---+-----+
|  x|   10|
|  x|   15|
|  x|   10|
|  x|   25|
|  y|   20|
|  y|   10|
|  y|   20|
+---+-----+

And assume I want to get a list of prices for each key using window:

w = Window.partitionBy('key')
original_df.withColumn('price_list', F.collect_list('price').over(w)).show()

Output:

+---+-----+----------------+
|key|price|      price_list|
+---+-----+----------------+
|  x|   10|[10, 15, 10, 25]|
|  x|   15|[10, 15, 10, 25]|
|  x|   10|[10, 15, 10, 25]|
|  x|   25|[10, 15, 10, 25]|
|  y|   20|    [20, 10, 20]|
|  y|   10|    [20, 10, 20]|
|  y|   20|    [20, 10, 20]|
+---+-----+----------------+

So far so good.
But if I want to get an ordered list, and I add orderBy to my window w I get:

w = Window.partitionBy('key').orderBy('price')
original_df.withColumn('ordered_list', F.collect_list('price').over(w)).show()

Output:

+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   10|        [10, 10]|
|  x|   10|        [10, 10]|
|  x|   15|    [10, 10, 15]|
|  x|   25|[10, 10, 15, 25]|
|  y|   10|            [10]|
|  y|   20|    [10, 20, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+

Which means orderBy (kind of) changed the rows (same as what rowsBetween does) in the window as well! Which it's not supposed to do.

Eventhough I can fix it by specifying rowsBetween in the window and get the expected results,

w = Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

can someone explain why orderBy affects window in that way?


Solution

  • Spark Window are specified using three parts: partition, order and frame.

    1. When none of the parts are specified then whole dataset would be considered as a single window.
    2. When partition is specified using a column, one window per distinct value of the column is created. If only partition is specified, then when a when is evaluated for a row, all the rows in that partition would taken into account. Thats why you see all 4 values [10, 15, 10, 25] for all the rows in partition x.
    3. When partition and ordering is specified, then when row function is evaluated it takes the rank order of rows in partition and all the rows which has same or lower value (if default asc order is specified) rank are included. In your case, first row includes [10,10] because there 2 rows in the partition with the same rank.
    4. When Frame specification rowsBetween and rangeBetween are specified, then row evaluation would pick only those rows which matched frame rule. e.g. unbounded and currentRow is specified it would pick current row and all rows that occur before it. If orderBy is specified, it would change which rows occur before the current row accordingly.

    specifically to your question, orderBy is not only to sort the partitioned data but it also change the row frame selection

    Below are different windowspec and the corresponding output

    Window.orderBy()
    +---+-----+----------------------------+
    |key|price|price_list                  |
    +---+-----+----------------------------+
    |x  |15   |[15, 10, 10, 20, 10, 25, 20]|
    |x  |10   |[15, 10, 10, 20, 10, 25, 20]|
    |y  |10   |[15, 10, 10, 20, 10, 25, 20]|
    |y  |20   |[15, 10, 10, 20, 10, 25, 20]|
    |x  |10   |[15, 10, 10, 20, 10, 25, 20]|
    |x  |25   |[15, 10, 10, 20, 10, 25, 20]|
    |y  |20   |[15, 10, 10, 20, 10, 25, 20]|
    +---+-----+----------------------------+
    
    Window.partitionBy('key')
    +---+-----+----------------+
    |key|price|      price_list|
    +---+-----+----------------+
    |  x|   15|[15, 10, 10, 25]|
    |  x|   10|[15, 10, 10, 25]|
    |  x|   10|[15, 10, 10, 25]|
    |  x|   25|[15, 10, 10, 25]|
    |  y|   20|    [20, 10, 20]|
    |  y|   10|    [20, 10, 20]|
    |  y|   20|    [20, 10, 20]|
    +---+-----+----------------+
    
    Window.partitionBy('key').orderBy('price')
    +---+-----+----------------+
    |key|price|    ordered_list|
    +---+-----+----------------+
    |  x|   10|        [10, 10]|
    |  x|   10|        [10, 10]|
    |  x|   15|    [10, 10, 15]|
    |  x|   25|[10, 10, 15, 25]|
    |  y|   10|            [10]|
    |  y|   20|    [10, 20, 20]|
    |  y|   20|    [10, 20, 20]|
    +---+-----+----------------+
    
    w = Window.partitionBy('key').orderBy(F.desc('price'))
    +---+-----+----------------+
    |key|price|    ordered_list|
    +---+-----+----------------+
    |  x|   25|            [25]|
    |  x|   15|        [25, 15]|
    |  x|   10|[25, 15, 10, 10]|
    |  x|   10|[25, 15, 10, 10]|
    |  y|   20|        [20, 20]|
    |  y|   20|        [20, 20]|
    |  y|   10|    [20, 20, 10]|
    +---+-----+----------------+
    
    Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.currentRow)
    +---+-----+----------------+
    |key|price|    ordered_list|
    +---+-----+----------------+
    |  x|   10|            [10]|
    |  x|   10|        [10, 10]|
    |  x|   15|    [10, 10, 15]|
    |  x|   25|[10, 10, 15, 25]|
    |  y|   10|            [10]|
    |  y|   20|        [10, 20]|
    |  y|   20|    [10, 20, 20]|
    +---+-----+----------------+
    
    Window.partitionBy('key').rowsBetween(Window.unboundedPreceding, Window.currentRow)
    +---+-----+----------------+
    |key|price|    ordered_list|
    +---+-----+----------------+
    |  x|   15|            [15]|
    |  x|   10|        [15, 10]|
    |  x|   10|    [15, 10, 10]|
    |  x|   25|[15, 10, 10, 25]|
    |  y|   10|            [10]|
    |  y|   20|        [10, 20]|
    |  y|   20|    [10, 20, 20]|
    +---+-----+----------------+