I'm trying to solve a problem where I need to count rows of fault codes when they first appear by marking the "fault_start" column as "True". If they repeat sequentially without any zeroes appearing, mark them as "False". If a zero appears, that means the fault(s) has cleared and if the same fault(s) appear(s) again, mark it/them as "True".
I've included a sample of the data below. Please note that the "rn" column is simply the row number and can be ignored if needed. Also, "fault_start" is the "fault_code" column which is lagged by one row.
truck_fault_counts_1hz_df =
Truck Timestamp fault_code rn fault_start
251_PS1_PB_520_REF_111199 2023-07-18T00:01:02.739+0000 3 63 49
251_PS1_PB_520_REF_111199 2023-07-18T00:01:03.739+0000 244 64 3
251_PS1_PB_520_REF_111199 2023-07-18T00:01:04.739+0000 3 65 244
251_PS1_PB_520_REF_111199 2023-07-18T00:01:05.739+0000 244 66 3
251_PS1_PB_520_REF_111199 2023-07-18T00:01:06.739+0000 3 67 244
251_PS1_PB_520_REF_111199 2023-07-18T00:01:07.739+0000 244 68 3
251_PS1_PB_520_REF_111199 2023-07-18T00:01:08.739+0000 3 69 244
251_PS1_PB_520_REF_111199 2023-07-18T00:01:09.739+0000 244 70 3
251_PS1_PB_520_REF_111199 2023-07-18T00:01:10.739+0000 3 71 244
251_PS1_PB_520_REF_111199 2023-07-18T00:01:11.739+0000 244 72 3
251_PS1_PB_520_REF_111199 2023-07-18T00:01:12.741+0000 3 73 244
251_PS1_PB_520_REF_111199 2023-07-18T00:01:13.741+0000 244 74 3
251_PS1_PB_520_REF_111199 2023-07-18T00:01:14.839+0000 49 75 244
251_PS1_PB_520_REF_111199 2023-07-18T00:01:15.839+0000 0 76 49
251_PS1_PB_520_REF_111199 2023-07-18T00:01:16.840+0000 49 77 0
251_PS1_PB_520_REF_111199 2023-07-18T00:01:17.840+0000 0 78 49
251_PS1_PB_520_REF_111199 2023-07-18T00:01:18.840+0000 49 79 0
251_PS1_PB_520_REF_111199 2023-07-18T00:01:19.840+0000 0 80 49
251_PS1_PB_520_REF_111199 2023-07-18T00:01:20.840+0000 49 81 0
251_PS1_PB_520_REF_111199 2023-07-18T00:01:21.840+0000 0 82 49
251_PS1_PB_520_REF_111199 2023-07-18T00:01:22.840+0000 49 83 0
251_PS1_PB_520_REF_111199 2023-07-18T00:01:23.840+0000 0 84 49
251_PS1_PB_520_REF_111199 2023-07-18T00:01:24.840+0000 49 85 0
251_PS1_PB_520_REF_111199 2023-07-18T00:01:25.841+0000 0 86 49
251_PS1_PB_520_REF_111199 2023-07-18T00:01:26.842+0000 49 87 0
251_PS1_PB_520_REF_111199 2023-07-18T00:01:27.940+0000 3 88 49
251_PS1_PB_520_REF_111199 2023-07-18T00:01:28.940+0000 244 89 3
The solution below counts all faults that are not zero but fails to not count the sequential repeated occurrences:
# Initialize a list of numbers to not be included in the distinct_faults list
x = [0.0]
# Select the distinct faults in the fault_code column and convert it to Pandas
distinct_faults_df = truck_fault_counts_1hz_df.dropDuplicates(["fault_code"]).select("fault_code").toPandas()["fault_code"]
# Create a list of the distinct faults in the fault_code column and convert the values to ints
distinct_faults = [int(i) for i in distinct_faults_df if i not in x]
# Sort the list in ascending order
# Convert the values to strings
distinct_faults = [str(i) for i in distinct_faults]
# Convert the fault_code and fault_start column to int then to string from float
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_code",truck_fault_counts_1hz_df.fault_code.cast('int'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_code",truck_fault_counts_1hz_df.fault_code.cast('string'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_start",truck_fault_counts_1hz_df.fault_start.cast('int'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_start",truck_fault_counts_1hz_df.fault_start.cast('string'))
# Fill any Null values with "0"
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.fillna(value="0", subset=["fault_start"])
for i in distinct_faults:
# Create a column for row count and fault start
# fault_start lags fault_code by one row
# Any row that has a number greater than 0 in fault_code
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.\
when(((col("fault_code") == i) & (col("fault_start") != i)),
when(((col("fault_code") == 0) & (col("fault_start") == 0)),
when(((col("fault_code") == i) & (col("fault_start") == i)),
when(((col("fault_code") == 0) & (col("fault_start") == i)),
truck_fault_counts_1hz_noSpam_df = truck_fault_counts_1hz_df.where(truck_fault_counts_1hz_df.fault_start == True).drop("rn")
The output I have currently:
Truck Timestamp fault_code rn fault_start
251_PS1_PB_520_REF_111199 2023-07-18T00:01:02.739+0000 3 63 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:03.739+0000 244 64 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:04.739+0000 3 65 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:05.739+0000 244 66 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:06.739+0000 3 67 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:07.739+0000 244 68 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:08.739+0000 3 69 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:09.739+0000 244 70 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:10.739+0000 3 71 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:11.739+0000 244 72 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:12.741+0000 3 73 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:13.741+0000 244 74 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:14.839+0000 49 75 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:15.839+0000 0 76 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:16.840+0000 49 77 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:17.840+0000 0 78 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:18.840+0000 49 79 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:19.840+0000 0 80 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:20.840+0000 49 81 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:21.840+0000 0 82 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:22.840+0000 49 83 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:23.840+0000 0 84 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:24.840+0000 49 85 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:25.841+0000 0 86 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:26.842+0000 49 87 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:27.940+0000 3 88 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:28.940+0000 244 89 True
The output I'm hoping to achieve:
Truck Timestamp fault_code rn fault_start
251_PS1_PB_520_REF_111199 2023-07-18T00:01:02.739+0000 3 63 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:03.739+0000 244 64 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:04.739+0000 3 65 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:05.739+0000 244 66 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:06.739+0000 3 67 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:07.739+0000 244 68 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:08.739+0000 3 69 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:09.739+0000 244 70 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:10.739+0000 3 71 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:11.739+0000 244 72 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:12.741+0000 3 73 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:13.741+0000 244 74 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:14.839+0000 49 75 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:15.839+0000 0 76 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:16.840+0000 49 77 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:17.840+0000 0 78 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:18.840+0000 49 79 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:19.840+0000 0 80 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:20.840+0000 49 81 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:21.840+0000 0 82 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:22.840+0000 49 83 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:23.840+0000 0 84 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:24.840+0000 49 85 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:25.841+0000 0 86 False
251_PS1_PB_520_REF_111199 2023-07-18T00:01:26.842+0000 49 87 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:27.940+0000 3 88 True
251_PS1_PB_520_REF_111199 2023-07-18T00:01:28.940+0000 244 89 True
I've also been trying to implement a dictionary with the fault codes as keys and the fault starts (True/False) as values to keep track of they've started but have not found a practical way implement that does not involve the time and memory intensive operation of iterating over every single row (this data set contains 19 million rows so that is not practical). I've also considered making a new column for each new fault code but also have not found a way to implement that either.
Any help is greatly appreciated!!
W1 = Window.partitionBy('Truck').orderBy('Timestamp')
df = df.withColumn('reset_cnt', F.sum((F.col('fault_code') == 0).cast('int')).over(W1))
W2 = Window.partitionBy('Truck', 'reset_cnt', 'fault_code').orderBy('Timestamp')
df = df.withColumn('fault_start', F.row_number().over(W2) == 1)
df = df.withColumn('fault_start', F.expr("IF(fault_code = 0, false, fault_start)"))
Partition the dataframe by Truck
and order by Timestamp
then calculate the cumulative sum over the condition when fault_code
equals 0
. This will create a column called reset_cnt
which will help us distinguish between different blocks of rows where fault_code
is reset i.e fault(s) are cleared.
| Truck| Timestamp|fault_code| rn|reset_cnt|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 63| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 64| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 65| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 66| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 67| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 68| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 69| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 70| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 71| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 72| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 73| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 74| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 75| 0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 76| 1|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 77| 1|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 78| 2|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 79| 2|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 80| 3|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 81| 3|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 82| 4|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 83| 4|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 84| 5|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 85| 5|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 86| 6|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 87| 6|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 88| 6|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 89| 6|
Create fault_start
column: partition the dataframe by Truck
, reset_cnt
, and fault_code
and assign row numbers to identify duplicate fault_codes
per partition.
| Truck| Timestamp|fault_code| rn|reset_cnt|fault_start|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 63| 0| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 64| 0| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 65| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 66| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 67| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 68| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 69| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 70| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 71| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 72| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 73| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 74| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 75| 0| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 76| 1| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 77| 1| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 78| 2| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 79| 2| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 80| 3| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 81| 3| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 82| 4| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 83| 4| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 84| 5| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 85| 5| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 86| 6| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 87| 6| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 88| 6| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 89| 6| true|
Mask the rows in fault_start
with false where fault_code
is 0
| Truck| Timestamp|fault_code| rn|reset_cnt|fault_start|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 63| 0| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 64| 0| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 65| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 66| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 67| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 68| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 69| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 70| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 71| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 72| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 73| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 74| 0| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 75| 0| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 76| 1| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 77| 1| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 78| 2| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 79| 2| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 80| 3| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 81| 3| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 82| 4| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 83| 4| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 84| 5| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 85| 5| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 0| 86| 6| false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 49| 87| 6| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 3| 88| 6| true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...| 244| 89| 6| true|