Search code examples
dataframepysparkgroup-bydatetime-formatweek-number

Add rows of data to each group in a Spark dataframe


I have this dataframe -

data = [(0,1,1,201505,3),
        (1,1,1,201506,5),
        (2,1,1,201507,7),
        (3,1,1,201508,2),
        (4,2,2,201750,3),
        (5,2,2,201751,0),
        (6,2,2,201752,1),
        (7,2,2,201753,1)
       ]
cols = ['id','item','store','week','sales']
data_df = spark.createDataFrame(data=data,schema=cols)
display(data_df)

What I want it this -

data_new = [(0,1,1,201505,3,0),
            (1,1,1,201506,5,0),
            (2,1,1,201507,7,0),
            (3,1,1,201508,2,0),
            (4,1,1,201509,0,0),
            (5,1,1,201510,0,0),
            (6,1,1,201511,0,0),
            (7,1,1,201512,0,0),
            (8,2,2,201750,3,0),
            (9,2,2,201751,0,0),
            (10,2,2,201752,1,0),
            (11,2,2,201753,1,0),
            (12,2,2,201801,0,0),
            (13,2,2,201802,0,0),
            (14,2,2,201803,0,0),
            (15,2,2,201804,0,0)]
cols_new = ['id','item','store','week','sales','flag',]
data_df_new = spark.createDataFrame(data=data_new,schema=cols_new)
display(data_df_new)

So basically, I want 8 (this can also be 6 or 10) weeks of data for each item-store groupby combination. Wherever the 52/53 weeks for the year ends, I need the weeks for the next year, as I have mentioned in the sample. I need this in PySpark, thanks in advance!


Solution

  • See my attempt below. Could have made it shorter but felt should be as explicit as I can so I dint chain the soultions. code below

    from pyspark.sql import functions as F
    spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
    
    
    # Convert week of the year to date
    s=data_df.withColumn("week", expr("cast (week as string)")).withColumn('date', F.to_date(F.concat("week",F.lit("6")), "yyyywwu"))
    
    
    s = (s.groupby('item', 'store').agg(F.collect_list('sales').alias('sales'),F.collect_list('date').alias('date'))#Put sales and dates in an array
         .withColumn("id", sequence(lit(0), lit(6)))#Create sequence ids with the required expansion range per group
        )
    
    #Explode datframe back with each item/store combination in a row
    s =s.selectExpr('item','store','inline(arrays_zip(date,id,sales))')
    
    #Create partition window broadcasting from start to end for each item/store combination
    w = Window.partitionBy('item','store').orderBy('id').rowsBetween(-sys.maxsize, sys.maxsize)
    
    #Create partition window broadcasting from start to end for each item/store/date combination. the purpose here is to aggregate over null dates as group
    w1 = Window.partitionBy('item','store','date').orderBy('id').rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    s=(s.withColumn('increment', F.when(col('date').isNull(),(row_number().over(w1))*7).otherwise(0))#Create increment values per item/store combination
       
       .withColumn('date1', F.when(col('date').isNull(),max('date').over(w)).otherwise(col('date')))#get last date in each item/store combination
       
      )
    
    
    
    # #Compute the  week of year and drop columns not wanted
    s = s.withColumn("weekofyear", expr("weekofyear(date_add(date1, cast(increment as int)))")).drop('date','increment','date1').na.fill(0)               
                   
    
    
    s.show(truncate=False)
    

    Outcome

    +----+-----+---+-----+----------+
    |item|store|id |sales|weekofyear|
    +----+-----+---+-----+----------+
    |1   |1    |0  |3    |5         |
    |1   |1    |1  |5    |6         |
    |1   |1    |2  |7    |7         |
    |1   |1    |3  |2    |8         |
    |1   |1    |4  |0    |9         |
    |1   |1    |5  |0    |10        |
    |1   |1    |6  |0    |11        |
    |2   |2    |0  |3    |50        |
    |2   |2    |1  |0    |51        |
    |2   |2    |2  |1    |52        |
    |2   |2    |3  |1    |1         |
    |2   |2    |4  |0    |2         |
    |2   |2    |5  |0    |3         |
    |2   |2    |6  |0    |4         |
    +----+-----+---+-----+----------+