Search code examples
pysparkfeature-engineering

Adding historical path feature to a PySpark dataframe


I have column 'Event' in my original dataframe, I want to add the other 2 columns.

Event Event_lag Hist_event
0 N N
0 0 N0
1 0 N00
0 1 N001

Solution

  • from pyspark.sql.functions import lag, col, monotonically_increasing_id, collect_list, concat_ws
    from pyspark.sql import Window
    
    #sample data
    df= sc.parallelize([[0], [0], [1], [0]]).toDF(["Event"])
    
    #add row index to the dataframe
    df = df.withColumn("row_idx", monotonically_increasing_id())
    
    w  = Window.orderBy("row_idx")
    
    #add 'Event_Lag' column to the dataframe
    df = df.withColumn("Event_Lag", lag(col('Event').cast('string')).over(w))
    df = df.fillna({'Event_Lag':'N'})
    
    #finally add 'Hist_Event' column to the dataframe and remove row index column (i.e. 'row_idx') to have the final result
    df = df.withColumn("Hist_Event", collect_list(col('Event_Lag')).over(w)).\
            withColumn("Hist_Event", concat_ws("","Hist_Event")).\
            drop("row_idx")
    df.show()
    

    Sample input:

    +-----+
    |Event|
    +-----+
    |    0|
    |    0|
    |    1|
    |    0|
    +-----+
    

    Output is:

    +-----+---------+----------+
    |Event|Event_Lag|Hist_Event|
    +-----+---------+----------+
    |    0|        N|         N|
    |    0|        0|        N0|
    |    1|        0|       N00|
    |    0|        1|      N001|
    +-----+---------+----------+