Search code examples
dataframepysparkfilterflagsspark-window-function

adding flag based on occurrence of repetitive pattern in column categories using pyspark


I have a pyspark dataframe like this:

port#|       log_date    |code
1111 |2022-05-16 08:07:23|AAA
1111 |2022-05-16 08:08:23|XXX
1111 |2022-05-16 08:09:23|BBB
1111 |2022-05-16 08:10:23|CCC
1111 |2022-05-16 08:11:23|YYY
1111 |2022-05-16 08:12:23|DDD
1111 |2022-05-16 08:13:23|EEE
2222 |2022-05-17 09:07:23|AAA
2222 |2022-05-17 09:08:23|XXX
2222 |2022-05-17 09:09:23|BBB
2222 |2022-05-17 09:10:23|CCC
2222 |2022-05-17 09:11:23|YYY
2222 |2022-05-17 09:12:23|DDD
2222 |2022-05-17 09:13:23|EEE

I want to flag the rows that occur between codes XXX and YYY (inclusive) group by each port# and sort by log_date.

I tried window functions that partition by port# and order by log_date but could not get the desired result. The expected result would be something like below:

port#|       log_date    |code|flag
1111 |2022-05-16 08:07:23|AAA | 0
1111 |2022-05-16 08:08:23|XXX | 1
1111 |2022-05-16 08:09:23|BBB | 1
1111 |2022-05-16 08:10:23|CCC | 1
1111 |2022-05-16 08:11:23|YYY | 1
1111 |2022-05-16 08:12:23|DDD | 0
1111 |2022-05-16 08:13:23|EEE | 0
2222 |2022-05-17 09:07:23|AAA | 0
2222 |2022-05-17 09:08:23|XXX | 1
2222 |2022-05-17 09:09:23|BBB | 1
2222 |2022-05-17 09:10:23|CCC | 1
2222 |2022-05-17 09:11:23|YYY | 1
2222 |2022-05-17 09:12:23|DDD | 0
2222 |2022-05-17 09:13:23|EEE | 0

Can anyone help on how to write this logic in pyspark.


Solution

  • Assuming there is only single XXX and YYY in each port#

    We can try to find XXX and YYY timestamp for each port and flag the values which fall between those time range for each port

    Below is data preparation and code to execute above step

    import pandas as pd
    from io import StringIO
    from pyspark.sql import functions as F
    from pyspark.sql import Window
    
    s="""
    port#|log_date|code
    1111 |2022-05-16 08:07:23|AAA
    1111 |2022-05-16 08:08:23|XXX
    1111 |2022-05-16 08:09:23|BBB
    1111 |2022-05-16 08:10:23|CCC
    1111 |2022-05-16 08:11:23|YYY
    1111 |2022-05-16 08:12:23|DDD
    1111 |2022-05-16 08:13:23|EEE
    2222 |2022-05-17 09:07:23|AAA
    2222 |2022-05-17 09:08:23|XXX
    2222 |2022-05-17 09:09:23|BBB
    2222 |2022-05-17 09:10:23|CCC
    2222 |2022-05-17 09:11:23|YYY
    2222 |2022-05-17 09:12:23|DDD
    2222 |2022-05-17 09:13:23|EEE"""
    
    pdf=pd.read_csv(StringIO(s),sep="|")
    spdf=spark.createDataFrame(pdf)
    
    spdf.withColumn("test",F.last(F.when(F.col("code")=="XXX",F.col("log_date")).otherwise(None),True).\
                    over(Window.partitionBy("port#").\
                    orderBy("log_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).\
    withColumn("test2",F.last(F.when(F.col("code")=="YYY",F.col("log_date")).otherwise(None),True).\
                    over(Window.partitionBy("port#").\
                    orderBy("log_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).\
    withColumn("flag",F.when(( F.col("log_date")>=F.col("test")) & ( F.col("log_date")<=F.col("test2")),1).otherwise(0)).drop("test","test2").show()
    
    
    #output
    +-----+-------------------+----+----+
    |port#|           log_date|code|flag|
    +-----+-------------------+----+----+
    | 1111|2022-05-16 08:07:23| AAA|   0|
    | 1111|2022-05-16 08:08:23| XXX|   1|
    | 1111|2022-05-16 08:09:23| BBB|   1|
    | 1111|2022-05-16 08:10:23| CCC|   1|
    | 1111|2022-05-16 08:11:23| YYY|   1|
    | 1111|2022-05-16 08:12:23| DDD|   0|
    | 1111|2022-05-16 08:13:23| EEE|   0|
    | 2222|2022-05-17 09:07:23| AAA|   0|
    | 2222|2022-05-17 09:08:23| XXX|   1|
    | 2222|2022-05-17 09:09:23| BBB|   1|
    | 2222|2022-05-17 09:10:23| CCC|   1|
    | 2222|2022-05-17 09:11:23| YYY|   1|
    | 2222|2022-05-17 09:12:23| DDD|   0|
    | 2222|2022-05-17 09:13:23| EEE|   0|
    +-----+-------------------+----+----+