Search code examples
pythondataframeapache-sparkpysparkdata-cleaning

In pyspark replace value of column 'Booking' for year 2020


I have a data frame in pyspark with data two year 2019 and 2020.

enter image description here

If any booking value for date of year 2020 is less than 25 then replace that with same date of 2019 moving average value .

Example : enter image description here

required: enter image description here

manually i am able to do that

targetDf = df.withColumn("Booking",when(df["date"] == "2020-01-12", 75).otherwise(df["Booking"])) 

But I have so many values to replace so I tried below codes

targetDf = df.withColumn("Booking",\
 when(df["Booking"] <= 25, (df["movingAvg"].when(df["date"] == ?)).otherwise(df["Booking"]))

I dont know how to write (?) last year same date moving average value.


Solution

  • You can use self left join on condition VenueName = VenueName and date - 1 year = date, then get last year average if Booking < 25 using when:

    from pyspark.sql import functions as F
    
    df = df.withColumn("date", F.to_date("date", "dd-MM-yyyy"))
    
    df1 = df.alias("df1").join(
        df.alias("df2"),
        (F.col("df1.VenueName") == F.col("df2.VenueName")) &
        (F.expr("df1.date - INTERVAL 1 year") == F.col("df2.date")),
        "left"
    ).select(
        "df1.name_id", "df1.VenueName", "df1.date",
        F.when(
            F.col("df1.Booking") < 25, F.coalesce("df2.movingAvg", "df1.Booking")
        ).otherwise(F.col("df1.Booking")).alias("Booking"),
        "df1.movingAvg"
    )
    
    # verify changes for dates = 2020-01-12 / 2019-01-12
    df1.filter("date in ('2020-01-12', '2019-01-12')").show()
    
    name_id VenueName date Booking movingAvg
    1367 Tortuga 2020-01-12 75 37.42857143
    1011 Tortuga 2019-01-12 100.0 75.10286