Search code examples
pysparkpyspark-pandas

How to create lag columns and union multiple dataframes in pyspark?


I am trying to create lag columns for several dataframes individually and then combine them into a single dataframe.

As pyspark is lazy evaluated, it's calculating lag after combining dataframes.

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define the schema for the DataFrame
schema = StructType([
    StructField("YEARWEEK", StringType(), True),
    StructField("CATEGORY", StringType(), True),
    StructField("vendor_nbr", StringType(), True),
    StructField("dc_nbr", StringType(), True),
    StructField("rejection_rate", FloatType(), True),
    StructField("rating", IntegerType(), True),
    StructField("ts_id", StringType(), True) ,
    StructField("rating_lag1", IntegerType(), True),
    StructField("rating_lag2", IntegerType(), True),
    StructField("rating_lag3", IntegerType(), True),
    StructField("rating_lag4", IntegerType(), True)
])

# Create an empty DataFrame with the specified schema
df_comb = spark.createDataFrame([], schema)

window_spec = Window.orderBy("ts_id", "YEARWEEK")
dfs = []
for id in df.select(col("ts_id")).distinct():
  dfs.append(df.filter(df.ts_id == id).sort('ts_id', 'YEARWEEK').withColumn("rating_lag1", lag("rating", 1).over(window_spec))\
                                                                .withColumn("rating_lag2", lag("rating", 2).over(window_spec))\
                                                                .withColumn("rating_lag3", lag("rating", 3).over(window_spec))\
                                                                .withColumn("rating_lag4", lag("rating", 4).over(window_spec))\
                                                                .na.fill({'rating_lag1': 1, 'rating_lag2': 1, 'rating_lag3': 1, 'rating_lag4': 1}))

for dfx in dfs:
  df_comb = df_comb.union(dfx)

Just to illustrate the problem.

dfp = df_comb.toPandas()
dfp.iloc[1305:1315]

enter image description here

My data starts from first week of 2022, ideally lag3 should be null. Because I replaced null values with 1, lag3 should be 1, but it is 5 which is not right.

What is the correct way to union dataframes with lag columns in pyspark?


Solution

  • Your window seems wrong and intended to partition by ts_id I think.

    window_spec = Window.partitionBy("ts_id").orderBy("YEARWEEK")