I am trying to create lag columns for several dataframes individually and then combine them into a single dataframe.
As pyspark is lazy evaluated, it's calculating lag after combining dataframes.
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
# Define the schema for the DataFrame
schema = StructType([
StructField("YEARWEEK", StringType(), True),
StructField("CATEGORY", StringType(), True),
StructField("vendor_nbr", StringType(), True),
StructField("dc_nbr", StringType(), True),
StructField("rejection_rate", FloatType(), True),
StructField("rating", IntegerType(), True),
StructField("ts_id", StringType(), True) ,
StructField("rating_lag1", IntegerType(), True),
StructField("rating_lag2", IntegerType(), True),
StructField("rating_lag3", IntegerType(), True),
StructField("rating_lag4", IntegerType(), True)
])
# Create an empty DataFrame with the specified schema
df_comb = spark.createDataFrame([], schema)
window_spec = Window.orderBy("ts_id", "YEARWEEK")
dfs = []
for id in df.select(col("ts_id")).distinct():
dfs.append(df.filter(df.ts_id == id).sort('ts_id', 'YEARWEEK').withColumn("rating_lag1", lag("rating", 1).over(window_spec))\
.withColumn("rating_lag2", lag("rating", 2).over(window_spec))\
.withColumn("rating_lag3", lag("rating", 3).over(window_spec))\
.withColumn("rating_lag4", lag("rating", 4).over(window_spec))\
.na.fill({'rating_lag1': 1, 'rating_lag2': 1, 'rating_lag3': 1, 'rating_lag4': 1}))
for dfx in dfs:
df_comb = df_comb.union(dfx)
Just to illustrate the problem.
dfp = df_comb.toPandas()
dfp.iloc[1305:1315]
My data starts from first week of 2022, ideally lag3 should be null. Because I replaced null values with 1, lag3 should be 1, but it is 5 which is not right.
What is the correct way to union dataframes with lag columns in pyspark?
Your window seems wrong and intended to partition by ts_id
I think.
window_spec = Window.partitionBy("ts_id").orderBy("YEARWEEK")