Search code examples
python-3.xpyspark

Mark Rows as True When Condition First Appears, False if Sequentially Repeated


I'm trying to solve a problem where I need to count rows of fault codes when they first appear by marking the "fault_start" column as "True". If they repeat sequentially without any zeroes appearing, mark them as "False". If a zero appears, that means the fault(s) has cleared and if the same fault(s) appear(s) again, mark it/them as "True".

I've included a sample of the data below. Please note that the "rn" column is simply the row number and can be ignored if needed. Also, "fault_start" is the "fault_code" column which is lagged by one row.

truck_fault_counts_1hz_df = 
Truck                       Timestamp                       fault_code  rn  fault_start
251_PS1_PB_520_REF_111199   2023-07-18T00:01:02.739+0000    3           63  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:03.739+0000    244         64  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:04.739+0000    3           65  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:05.739+0000    244         66  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:06.739+0000    3           67  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:07.739+0000    244         68  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:08.739+0000    3           69  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:09.739+0000    244         70  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:10.739+0000    3           71  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:11.739+0000    244         72  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:12.741+0000    3           73  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:13.741+0000    244         74  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:14.839+0000    49          75  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:15.839+0000    0           76  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:16.840+0000    49          77  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:17.840+0000    0           78  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:18.840+0000    49          79  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:19.840+0000    0           80  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:20.840+0000    49          81  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:21.840+0000    0           82  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:22.840+0000    49          83  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:23.840+0000    0           84  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:24.840+0000    49          85  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:25.841+0000    0           86  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:26.842+0000    49          87  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:27.940+0000    3           88  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:28.940+0000    244         89  3

The solution below counts all faults that are not zero but fails to not count the sequential repeated occurrences:

# Initialize a list of numbers to not be included in the distinct_faults list
x = [0.0]
# Select the distinct faults in the fault_code column and convert it to Pandas
distinct_faults_df = truck_fault_counts_1hz_df.dropDuplicates(["fault_code"]).select("fault_code").toPandas()["fault_code"]
# Create a list of the distinct faults in the fault_code column and convert the values to ints
distinct_faults = [int(i) for i in distinct_faults_df if i not in x]
# Sort the list in ascending order
distinct_faults.sort()
# Convert the values to strings
distinct_faults = [str(i) for i in distinct_faults]

# Convert the fault_code and fault_start column to int then to string from float
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_code",truck_fault_counts_1hz_df.fault_code.cast('int'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_code",truck_fault_counts_1hz_df.fault_code.cast('string'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_start",truck_fault_counts_1hz_df.fault_start.cast('int'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_start",truck_fault_counts_1hz_df.fault_start.cast('string'))

# Fill any Null values with "0"
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.fillna(value="0", subset=["fault_start"])

for i in distinct_faults:
    # Create a column for row count and fault start
    # fault_start lags fault_code by one row
    # Any row that has a number greater than 0 in fault_code
    truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.\
        withColumn("fault_start",\
        when(((col("fault_code") == i) & (col("fault_start") != i)), 
        lit("True")).\
        when(((col("fault_code") == 0) & (col("fault_start") == 0)), 
        lit("False")).\
        when(((col("fault_code") == i) & (col("fault_start") == i)), 
        lit("False")).\
        when(((col("fault_code") == 0) & (col("fault_start") == i)), 
        lit("False")).\
        otherwise(truck_fault_counts_1hz_df.fault_start))

truck_fault_counts_1hz_noSpam_df = truck_fault_counts_1hz_df.where(truck_fault_counts_1hz_df.fault_start == True).drop("rn")

display(truck_fault_counts_1hz_df)

The output I have currently:

Truck                       Timestamp                       fault_code  rn  fault_start
251_PS1_PB_520_REF_111199   2023-07-18T00:01:02.739+0000    3           63  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:03.739+0000    244         64  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:04.739+0000    3           65  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:05.739+0000    244         66  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:06.739+0000    3           67  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:07.739+0000    244         68  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:08.739+0000    3           69  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:09.739+0000    244         70  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:10.739+0000    3           71  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:11.739+0000    244         72  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:12.741+0000    3           73  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:13.741+0000    244         74  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:14.839+0000    49          75  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:15.839+0000    0           76  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:16.840+0000    49          77  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:17.840+0000    0           78  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:18.840+0000    49          79  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:19.840+0000    0           80  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:20.840+0000    49          81  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:21.840+0000    0           82  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:22.840+0000    49          83  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:23.840+0000    0           84  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:24.840+0000    49          85  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:25.841+0000    0           86  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:26.842+0000    49          87  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:27.940+0000    3           88  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:28.940+0000    244         89  True

The output I'm hoping to achieve:

Truck                       Timestamp                       fault_code  rn  fault_start
251_PS1_PB_520_REF_111199   2023-07-18T00:01:02.739+0000    3           63  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:03.739+0000    244         64  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:04.739+0000    3           65  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:05.739+0000    244         66  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:06.739+0000    3           67  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:07.739+0000    244         68  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:08.739+0000    3           69  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:09.739+0000    244         70  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:10.739+0000    3           71  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:11.739+0000    244         72  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:12.741+0000    3           73  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:13.741+0000    244         74  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:14.839+0000    49          75  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:15.839+0000    0           76  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:16.840+0000    49          77  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:17.840+0000    0           78  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:18.840+0000    49          79  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:19.840+0000    0           80  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:20.840+0000    49          81  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:21.840+0000    0           82  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:22.840+0000    49          83  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:23.840+0000    0           84  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:24.840+0000    49          85  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:25.841+0000    0           86  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:26.842+0000    49          87  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:27.940+0000    3           88  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:28.940+0000    244         89  True

I've also been trying to implement a dictionary with the fault codes as keys and the fault starts (True/False) as values to keep track of they've started but have not found a practical way implement that does not involve the time and memory intensive operation of iterating over every single row (this data set contains 19 million rows so that is not practical). I've also considered making a new column for each new fault code but also have not found a way to implement that either.

Any help is greatly appreciated!!


Solution

  • Code

    W1 = Window.partitionBy('Truck').orderBy('Timestamp')
    df = df.withColumn('reset_cnt', F.sum((F.col('fault_code') == 0).cast('int')).over(W1))
    
    W2 = Window.partitionBy('Truck', 'reset_cnt', 'fault_code').orderBy('Timestamp')
    df = df.withColumn('fault_start', F.row_number().over(W2) == 1)
    df = df.withColumn('fault_start', F.expr("IF(fault_code = 0, false, fault_start)"))
    

    How this works

    Partition the dataframe by Truck and order by Timestamp then calculate the cumulative sum over the condition when fault_code equals 0. This will create a column called reset_cnt which will help us distinguish between different blocks of rows where fault_code is reset i.e fault(s) are cleared.

    +--------------------+--------------------+----------+---+---------+
    |               Truck|           Timestamp|fault_code| rn|reset_cnt|
    +--------------------+--------------------+----------+---+---------+
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 63|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 64|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 65|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 66|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 67|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 68|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 69|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 70|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 71|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 72|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 73|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 74|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 75|        0|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 76|        1|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 77|        1|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 78|        2|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 79|        2|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 80|        3|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 81|        3|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 82|        4|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 83|        4|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 84|        5|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 85|        5|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 86|        6|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 87|        6|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 88|        6|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 89|        6|
    +--------------------+--------------------+----------+---+---------+
    

    Create fault_start column: partition the dataframe by Truck, reset_cnt, and fault_code and assign row numbers to identify duplicate fault_codes per partition.

    +--------------------+--------------------+----------+---+---------+-----------+
    |               Truck|           Timestamp|fault_code| rn|reset_cnt|fault_start|
    +--------------------+--------------------+----------+---+---------+-----------+
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 63|        0|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 64|        0|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 65|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 66|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 67|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 68|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 69|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 70|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 71|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 72|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 73|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 74|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 75|        0|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 76|        1|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 77|        1|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 78|        2|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 79|        2|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 80|        3|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 81|        3|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 82|        4|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 83|        4|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 84|        5|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 85|        5|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 86|        6|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 87|        6|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 88|        6|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 89|        6|       true|
    +--------------------+--------------------+----------+---+---------+-----------+
    

    Mask the rows in fault_start with false where fault_code is 0

    +--------------------+--------------------+----------+---+---------+-----------+
    |               Truck|           Timestamp|fault_code| rn|reset_cnt|fault_start|
    +--------------------+--------------------+----------+---+---------+-----------+
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 63|        0|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 64|        0|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 65|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 66|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 67|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 68|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 69|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 70|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 71|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 72|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 73|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 74|        0|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 75|        0|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 76|        1|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 77|        1|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 78|        2|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 79|        2|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 80|        3|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 81|        3|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 82|        4|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 83|        4|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 84|        5|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 85|        5|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 86|        6|      false|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 87|        6|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 88|        6|       true|
    |251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 89|        6|       true|
    +--------------------+--------------------+----------+---+---------+-----------+