I am a little confused about the method pyspark.sql.Window.rowsBetween
that accepts Window.unboundedPreceding
, Window.unboundedFollowing
, and Window.currentRow
objects as start
and end
arguments. Could you please explain how the function works and how to use Window
objects correctly, with some examples? Thank you!
Rows between/Range between as the name suggests help with limiting the number of rows considered inside a window.
Let us take a simple example.
Starting with data:
dfw = (
spark
.createDataFrame(
[
("abc", 1, 100),
("abc", 2, 200),
("abc", 3, 300),
("abc", 4, 200),
("abc", 5, 100),
],
"name string,id int,price int",
)
)
# output
+----+---+-----+
|name| id|price|
+----+---+-----+
| abc| 1| 100|
| abc| 2| 200|
| abc| 3| 300|
| abc| 4| 200|
| abc| 5| 100|
+----+---+-----+
Now over this data let's try to find of running max i.e max for each row:
(
dfw
.withColumn(
"rm",
F.max("price").over(Window.partitionBy("name").orderBy("id"))
)
.show()
)
#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc| 1| 100|100|
| abc| 2| 200|200|
| abc| 3| 300|300|
| abc| 4| 200|300|
| abc| 5| 100|300|
+----+---+-----+---+
So as expected it looked at each price from top to bottom one by one and populated the max value it got this behaviour is known as start = Window.unboundedPreceding
to end = Window.currentRow
Now changing rows between values to start = Window.unboundedPreceding
to end = Window.unboundedFollowing
we will get as below:
(
dfw
.withColumn(
"rm",
F.max("price").over(
Window
.partitionBy("name")
.orderBy("id")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
)
.show()
)
#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc| 1| 100|300|
| abc| 2| 200|300|
| abc| 3| 300|300|
| abc| 4| 200|300|
| abc| 5| 100|300|
+----+---+-----+---+
Now as you can see in the same window it's looking downwards in all values for a max instead of limiting it to the current row.
Now third will be start = Window.currentRow
and end = Window.unboundedFollowing
(
dfw
.withColumn(
"rm",
F.max("price").over(
Window
.partitionBy("name")
.orderBy("id")
.rowsBetween(Window.currentRow, Window.unboundedFollowing)
)
)
.show()
)
#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc| 1| 100|300|
| abc| 2| 200|300|
| abc| 3| 300|300|
| abc| 4| 200|200|
| abc| 5| 100|100|
+----+---+-----+---+
Now it's looking down only for a max starting its row from the current one.
Also, it's not limited to just these 3 to use as is you can even start = Window.currentRow-1
and end = Window.currentRow+1
so instead of looking for all values above or below it will only look at 1 row above and 1 row below.
like this:
(
dfw
.withColumn(
"rm",
F.max("price").over(
Window
.partitionBy("name")
.orderBy("id")
.rowsBetween(Window.currentRow-1, Window.currentRow+1)
)
)
.show()
)
# output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc| 1| 100|200|
| abc| 2| 200|300|
| abc| 3| 300|300|
| abc| 4| 200|300|
| abc| 5| 100|200|
+----+---+-----+---+
So you can imagine it a window inside the window which works around the current row it's processing.