I have a dataframe like below
df = spark.createDataFrame(
[(1,1,10), (2,1,10), (3,1,None),(4,1,10),(5,1,10),(6,1,20) \
,(7,1,20), (1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,20)],
["Month","customer","amount"])
windowPartition = Window.partitionBy("customer").orderBy("Month").rangeBetween(Window.currentRow-5,Window.currentRow )
df=df.withColumn("avg_6_month",avg('amount').over(windowPartition))
display(df.orderBy("customer","Month"))
and I want to perform a average over 6 months of data only when there is no nulls in between. I was able to achieve the below results using window function where nulls are ignored. For customer-1 even if there is a null value average is calculated ignoring nulls. For customer-2, there is only 5 months of data and still its trying to calculate average.
Since I want to calculate average only when there is 6 continuous variables without nulls, I created a count variable and calculated average only when the count is greater than or equal to 6 and the result is
df2 = df.groupBy("customer").agg({"amount":"count"}).withColumnRenamed("count(amount)", "Amount_count" )
df= df.join(broadcast(df2), on='customer', how='left')
windowPartition = Window.partitionBy("customer").orderBy("month").rangeBetween(Window.currentRow-5,Window.currentRow )
df=df.withColumn("avg_6_month",avg('amount').over(windowPartition))
df=df.withColumn("avg_6_month",when(df.Amount_count >=6, avg('amount').over(windowPartition)).otherwise(None))
columns=['month', 'customer','amount','avg_6_month']
display(df.select(*columns).orderBy("customer","month"))
So now for customer-2 average is not calculated which is what I wanted. But for Customer-1 I still don't want the average to be calculated because there no 6 continuous months of data without a null on the amount column.
I am new to pyspark and I know how to achieve this in R
> amount <- c(10,10,NA,10,10,20,20,10)
> roll_mean(amount, n = 3, align ="right", fill = NA)
[1] NA NA NA NA NA 13.33333 16.66667 16.66667
I am expecting an outcome like below in Pyspark.
My actual data has many nulls across different months for many customers. So I want to calculate average only when there is no nulls in 6 continuous months. Is this possible using window function or is there any other way to achieve this result?
Definitely possible! In your attempt, your amount_count is calculated per customer, but you'll want to do this per six month window. Something like this should work:
start = 5
calculate_between = 6
df = spark.createDataFrame([(1,1,10), (2,1,10), (3,1,None),(4,1,10),(5,1,10),(6,1,20),(7,1,20), (1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,20)], \
["month","customer","amount"])
windowPartition = Window.partitionBy("customer").orderBy("month").rangeBetween(Window.currentRow-start,Window.currentRow )
df=df.withColumn("six_month_has_null", max(col("amount").isNull()).over(windowPartition)
df=df.withColumn("avg_6_month",avg('amount').over(windowPartition))
df=df.withColumn("avg_6_month",when(df.six_month_has_null, None).otherwise(col("avg_6_month"))
df=df.withColumn("window_has_six_points", sum(lit(1)).over(windowPartition) == calculate_between)
df=df.withColumn("avg_6_month1",when(df.six_month_has_null | ~df.window_has_six_points, None).otherwise(col("avg_6_month")))
display(df)
Result
For the next data frame same codes gives
df = spark.createDataFrame(
[(1,1,1), (2,1,2), (3,1,None),(4,1,4),(5,1,5),(6,1,6) \
,(7,1,7),(8,1,8),(9,1,9),(10,1,None),(11,1,11),(12,1,12),
(1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,17)],
["month","customer","amount"])