Classifying intermittent signals in the timeseries - is there a better way to write this in Python?
Problem: a sensor produces a signal which can be intermittent, say one period it is 0.01, next period it is 0 and the following period it is 0.01 again. This is normal by design.
The aim of the analysis is to detect the signal despite its intermittent nature - essentially ignoring gaps that may be present. Analysis is done not in real time, meaning that look ahead is acceptable. Let's say if the gaps of maximum of two periods are to be ignored then the results would be as follows.
signal | detection |
---|---|
0 | FALSE |
0.01 | TRUE |
0.036 | TRUE |
0 | TRUE |
0.2 | TRUE |
0 | FALSE |
0 | FALSE |
0 | FALSE |
0 | FALSE |
0.5 | TRUE |
0 | TRUE |
0 | TRUE |
0.1 | TRUE |
0.0 | FALSE |
0.0 | FALSE |
0.0 | FALSE |
Solution: since programming skills are still at a beginner level, the following function was written. The code after the function contains a demonstration. The deficiency of the function is that it always lengthens the detection by the gap which it ignores, essentially is does not look ahead to check if there is a signal anywhere within the gap.
#%%
from IPython.display import display
import pandas as pd
#%%
def find_continuous(df,threshold,max_gap):
#df - series containing data
#threshold - minimum value (inclusive to detect)
#max_gap - number of time periods after the last value >= threshold to be considered as still containing the signal
i = 0
min_value=threshold
currentlyPriming = False
primeTimes = []
PrimeTrue=[]
Prime2=[]
distance=0
distance_to_check=0
distance_checked=1
while i < (len(df)):
print('element equals ',df.iloc[i],', index is ',df.index[i],', current i is ',i)
if df.iloc[i] < min_value and len(PrimeTrue)>0:
currentlyPriming=False
print ('currently priming set to False')
print('last index element in primeTrue list is ',PrimeTrue[-1])
if max_gap==0:
print('max gap is at zero')
elif max_gap==1 and df.index[i]-PrimeTrue[-1]==1:
print('max gap is 1 and this element is next after positive')
primeTimes.append(df.index[i])
elif max_gap>=2:
try:
distance=(Prime2[-1]-PrimeTrue[-1])
distance_to_check=max(max_gap-distance,0)
print('last index element in Prime2 list is ',Prime2[-1])
except:
print('Prime2 has not been initiated, first clustering detection')
distance=88888
distance_to_check=max(max_gap-1,0)
print('distance is ',distance,' distance to check is ', distance_to_check )
if distance_to_check>0:
primeTimes.append(df.index[i])
Prime2.append(df.index[i])
distance_checked+=1
print('distance checked is ',distance_checked)
elif distance_to_check==0:
distance_checked=1
elif df.iloc[i] < min_value and len(PrimeTrue)==0:
currentlyPriming=False
print('element is less than minimum value and element greater than minimum value was not found yet')
elif df.iloc[i] >= min_value:
PrimeTrue.append(df.index[i])
if currentlyPriming:
primeTimes.append(df.index[i])
print('section d, priming is ',currentlyPriming )
elif not currentlyPriming:
primeTimes.append(df.index[i])
currentlyPriming = True
print('section f, priming is ',currentlyPriming )
i += 1
return primeTimes
# %%
if __name__ == "__main__":
values=[0.05,0,0,0,0,0.037037037,0,0,0,0.035714286,0,0.05,0,0,0,0,0,0,0,0.025677,0,0.05,0,0,0,0.04,0,0.031037037,0,0,0,0,0,0.04,0,0,0,0.074074074,0,0.032258065,0,0,0,0.001,0,0,0,0,0,0,0,0,0,0,0.060606061,0,0,0,0.060606061,0,0,0,0,0,0,0,0,0]
v1=pd.DataFrame(data=values,index=None,columns=['values'])
list2=[]
list2=find_continuous(v1['values'],0.035,2)
for k in range(len(list2)):
print(k)
v1.at[list2[k],'cluster']=list2[k]
with pd.option_context("display.max_rows", v1.shape[0]):
display(v1)
Question: is there a better way to write this in Python and how would a highly-skilled Python developer would write this?
Thank you!
I consider that the simplest approach is to form groups of zero signals and count the size of each group and then use conditions to determine the detection value. This avoids looping over the rows and is clearer:
import pandas as pd
import numpy as np
df= pd.DataFrame({'signal': [0, 0.01, 0.036, 0, 0.2, 0, 0, 0, 0, 0.5, 0, 0, 0.1, 0.0, 0.0, 0.0]})
gp = 0
gp_prev = False
def func(x):
global gp, gp_prev
if x == 0:
if gp_prev == False:
gp += 1
gp_prev = True
return gp
else:
gp_prev = False
return 0
# use func to map and number groupings of zero and non-zero groups
df['group'] = df['signal'].map(func)
# collect on the groups, and record their sizes in column gsize
df['gsize'] = df.groupby('group')['signal'].transform('size')
# mark detection True or False according to type and size of groups
#note that group 1 (starting zero signal) is treated as a special case
df['detection'] = np.where(((df['group'] > 0) & (df['gsize'] >2)) | ((df['group'] == 1) & (df.loc[0,'signal'] ==0)), False, True)
# clean up by dropping the temporary columns
df_final = df.drop(['group','gsize'], axis = 1)
print(df)
which gives (for df so as to show the temporary workings):
signal group gsize detection
0 0.000 1 1 False
1 0.010 0 5 True
2 0.036 0 5 True
3 0.000 2 1 True
4 0.200 0 5 True
5 0.000 3 4 False
6 0.000 3 4 False
7 0.000 3 4 False
8 0.000 3 4 False
9 0.500 0 5 True
10 0.000 4 2 True
11 0.000 4 2 True
12 0.100 0 5 True
13 0.000 5 3 False
14 0.000 5 3 False
15 0.000 5 3 False