Search code examples

Pyspark function to minus previous rows

I'm encountering an issue when it comes to applying a lag/windows function to entire dataframe on a condition.

I want to minus the previous rows value(value1), with the current rows value(value2), from week 2 onwards.

Here is my data:


from pyspark.sql import functions as f

data = [
    (1, 1, 1),
    (2, 0, 5),
    (3, 0, 10),
    (4, 0, 20),
    (5, 0, 30),
    (6, 0, 40)
columns = ["week", "value1", "value2"]
df = spark.createDataFrame(data, columns)


Here is my logic to do the calc:


df2 = df.withColumn('value1',
                    f.when((f.col('week') > 1),
                           f.lag(df['value1']).over(w) - df['value2'] 


This does not work across the whole dataframe, it only applies to the first row.

as you can see here: enter image description here

E.g. Week 3's value1 should be (-4) - (10) which would equal -14

I want the previous weeks value1 to minus the current weeks value 2. The lag seems to not carry through the entire dataframe

Can anyone please help me here?


  • Just add sum window function. you will get expected result.

    df2 = df.withColumn('value1',
                        f.when((f.col('week') > 1),
                               f.sum(f.lag(df['value1']).over(w) - df['value2']).over(w)
    |1   |1     |1     |
    |2   |-4    |5     |
    |3   |-14   |10    |
    |4   |-34   |20    |
    |5   |-64   |30    |
    |6   |-104  |40    |