I have a data frame in pyspark with data two year 2019 and 2020.
If any booking value for date of year 2020 is less than 25 then replace that with same date of 2019 moving average value .
manually i am able to do that
targetDf = df.withColumn("Booking",when(df["date"] == "2020-01-12", 75).otherwise(df["Booking"]))
But I have so many values to replace so I tried below codes
targetDf = df.withColumn("Booking",\
when(df["Booking"] <= 25, (df["movingAvg"].when(df["date"] == ?)).otherwise(df["Booking"]))
I dont know how to write (?) last year same date moving average value.
You can use self left join on condition VenueName = VenueName
and date - 1 year = date
, then get last year average if Booking < 25 using when
:
from pyspark.sql import functions as F
df = df.withColumn("date", F.to_date("date", "dd-MM-yyyy"))
df1 = df.alias("df1").join(
df.alias("df2"),
(F.col("df1.VenueName") == F.col("df2.VenueName")) &
(F.expr("df1.date - INTERVAL 1 year") == F.col("df2.date")),
"left"
).select(
"df1.name_id", "df1.VenueName", "df1.date",
F.when(
F.col("df1.Booking") < 25, F.coalesce("df2.movingAvg", "df1.Booking")
).otherwise(F.col("df1.Booking")).alias("Booking"),
"df1.movingAvg"
)
# verify changes for dates = 2020-01-12 / 2019-01-12
df1.filter("date in ('2020-01-12', '2019-01-12')").show()
name_id | VenueName | date | Booking | movingAvg |
---|---|---|---|---|
1367 | Tortuga | 2020-01-12 | 75 | 37.42857143 |
1011 | Tortuga | 2019-01-12 | 100.0 | 75.10286 |