I'm encountering an issue when it comes to applying a lag/windows function to entire dataframe on a condition.
I want to minus the previous rows value(value1), with the current rows value(value2), from week 2 onwards.
Here is my data:
'''
from pyspark.sql import functions as f
data = [
(1, 1, 1),
(2, 0, 5),
(3, 0, 10),
(4, 0, 20),
(5, 0, 30),
(6, 0, 40)
]
columns = ["week", "value1", "value2"]
df = spark.createDataFrame(data, columns)
'''
Here is my logic to do the calc:
'''
w=Window.orderBy("week")
df2 = df.withColumn('value1',
f.when((f.col('week') > 1),
f.lag(df['value1']).over(w) - df['value2']
).otherwise(
f.col('value1')
)
)
'''
This does not work across the whole dataframe, it only applies to the first row.
E.g. Week 3's value1 should be (-4) - (10) which would equal -14
I want the previous weeks value1 to minus the current weeks value 2. The lag seems to not carry through the entire dataframe
Can anyone please help me here?
Just add sum
window function. you will get expected result.
w=Window.orderBy("week")
df2 = df.withColumn('value1',
f.when((f.col('week') > 1),
f.sum(f.lag(df['value1']).over(w) - df['value2']).over(w)
).otherwise(
f.col('value1')
)
)
+----+------+------+
|week|value1|value2|
+----+------+------+
|1 |1 |1 |
|2 |-4 |5 |
|3 |-14 |10 |
|4 |-34 |20 |
|5 |-64 |30 |
|6 |-104 |40 |
+----+------+------+