Search code examples
pythonpysparkapache-spark-sqllag

Lag Function in pyspark not showing the all the data


I have a fact data table which shows sell data (price, units, volume, and so on) I need to add to the actual dataframe the same columns but with the name "prev_year" at the end of it, showing the previos year values from the same row in the same period and previous year. something like this:

+---------+------+------+-----------+------------------+---------------------+
|year|period|sales_unit|sales_value|sales_unit_prev_year|sales_value_prev_year|
+---------+------+------+-----------+------------------+---------------------+
|     2023|     1|        50|         15|                45|                   14|
|     2023|     2|        55|         16|                50|                   15|
|     2022|     1|        45|         14|                 0|                    0|
|     2022|     2|        50|         15|                 0|                    0|
|     2021|     1|        40|         13|                 0|                    0|

I tried this the code below but only display in year column the value 2020 and the period 12 for some reason.

    lag_years = 1

    lag_window_spec = Window.partitionBy("period").orderBy("years")


    fact_data_columns = ["sales_unit", "sales_value"]  # they're more columns but only add these 2 for the example
    lagged_columns = []
    for col_name in fact_data_columns:
        lagged_col_name = f"{col_name}_prev_year"
        lagged_columns.append(lag(col(col_name), lag_years).over(lag_window_spec).alias(lagged_col_name))
    

    result = fact_data_df.select("*", *lagged_columns)
    
    
    result.show()

Solution

  • Code

    Create a dataframe corresponding to the previous year values from fact data columns by adding the lag_years to the year column.

    lag_years = 1
    fact_data_cols = ["sales_unit", "sales_value"]
    
    prev = df.withColumn('year', F.col('year') + lag_years)
    
    # prev.show()
    # +----+------+----------+-----------+
    # |year|period|sales_unit|sales_value|
    # +----+------+----------+-----------+
    # |2024|     1|        50|         15|
    # |2024|     2|        55|         16|
    # |2023|     1|        45|         14|
    # |2023|     2|        50|         15|
    # |2022|     1|        40|         13|
    # +----+------+----------+-----------+
    

    Rename the fact data columns in the previous year dataframe by adding _prev_year suffix

    prev = prev.select('year', 'period', *[F.col(c).alias(f'{c}_prev_year') for c in fact_data_cols])
    
    # prev.show()
    # +----+------+--------------------+---------------------+
    # |year|period|sales_unit_prev_year|sales_value_prev_year|
    # +----+------+--------------------+---------------------+
    # |2024|     1|                  50|                   15|
    # |2024|     2|                  55|                   16|
    # |2023|     1|                  45|                   14|
    # |2023|     2|                  50|                   15|
    # |2022|     1|                  40|                   13|
    # +----+------+--------------------+---------------------+
    

    Join the dataframes

    result = df.join(prev, on=['year', 'period'], how='left')
    
    # result.show()
    # +----+------+----------+-----------+--------------------+---------------------+
    # |year|period|sales_unit|sales_value|sales_unit_prev_year|sales_value_prev_year|
    # +----+------+----------+-----------+--------------------+---------------------+
    # |2023|     1|        50|         15|                  45|                   14|
    # |2023|     2|        55|         16|                  50|                   15|
    # |2022|     1|        45|         14|                  40|                   13|
    # |2022|     2|        50|         15|                null|                 null|
    # |2021|     1|        40|         13|                null|                 null|
    # +----+------+----------+-----------+--------------------+---------------------+