Search code examples
pyspark

Pyspark function to minus previous rows


I'm encountering an issue when it comes to applying a lag/windows function to entire dataframe on a condition.

I want to minus the previous rows value(value1), with the current rows value(value2), from week 2 onwards.

Here is my data:

'''

from pyspark.sql import functions as f

data = [
    (1, 1, 1),
    (2, 0, 5),
    (3, 0, 10),
    (4, 0, 20),
    (5, 0, 30),
    (6, 0, 40)
]
columns = ["week", "value1", "value2"]
df = spark.createDataFrame(data, columns)

'''

Here is my logic to do the calc:

'''

w=Window.orderBy("week")
                          
df2 = df.withColumn('value1',
                    f.when((f.col('week') > 1),
                           f.lag(df['value1']).over(w) - df['value2'] 
                          ).otherwise(
                        f.col('value1')
                    )
                   )

'''

This does not work across the whole dataframe, it only applies to the first row.

as you can see here: enter image description here

E.g. Week 3's value1 should be (-4) - (10) which would equal -14

I want the previous weeks value1 to minus the current weeks value 2. The lag seems to not carry through the entire dataframe

Can anyone please help me here?


Solution

  • Just add sum window function. you will get expected result.

    w=Window.orderBy("week")
                              
    df2 = df.withColumn('value1',
                        f.when((f.col('week') > 1),
                               f.sum(f.lag(df['value1']).over(w) - df['value2']).over(w)
                              ).otherwise(
                            f.col('value1')
                        )
                       )
    
    +----+------+------+
    |week|value1|value2|
    +----+------+------+
    |1   |1     |1     |
    |2   |-4    |5     |
    |3   |-14   |10    |
    |4   |-34   |20    |
    |5   |-64   |30    |
    |6   |-104  |40    |
    +----+------+------+