Search code examples
pythondataframetime-seriessignal-processing

How can I improve my Python code for classifying intermittent signals in a timeseries?


Classifying intermittent signals in the timeseries - is there a better way to write this in Python?

Problem: a sensor produces a signal which can be intermittent, say one period it is 0.01, next period it is 0 and the following period it is 0.01 again. This is normal by design.

The aim of the analysis is to detect the signal despite its intermittent nature - essentially ignoring gaps that may be present. Analysis is done not in real time, meaning that look ahead is acceptable. Let's say if the gaps of maximum of two periods are to be ignored then the results would be as follows.

signal detection
0 FALSE
0.01 TRUE
0.036 TRUE
0 TRUE
0.2 TRUE
0 FALSE
0 FALSE
0 FALSE
0 FALSE
0.5 TRUE
0 TRUE
0 TRUE
0.1 TRUE
0.0 FALSE
0.0 FALSE
0.0 FALSE

Solution: since programming skills are still at a beginner level, the following function was written. The code after the function contains a demonstration. The deficiency of the function is that it always lengthens the detection by the gap which it ignores, essentially is does not look ahead to check if there is a signal anywhere within the gap.

#%%
from IPython.display import display
import pandas as pd

#%%

def find_continuous(df,threshold,max_gap):
    #df - series containing data
    #threshold - minimum value (inclusive to detect)
    #max_gap - number of time periods after the last value >= threshold to be considered as still containing the signal
    
    i = 0 
    min_value=threshold
    currentlyPriming = False
    primeTimes = []
    PrimeTrue=[]
    Prime2=[]
    distance=0
    distance_to_check=0

    distance_checked=1
        
    while i < (len(df)):
        
        print('element equals ',df.iloc[i],', index is ',df.index[i],', current i is ',i)
        
        if df.iloc[i] < min_value and len(PrimeTrue)>0:
            
            currentlyPriming=False
            print ('currently priming set to False')
            
            print('last index element in primeTrue list is ',PrimeTrue[-1])
            
            
            if max_gap==0:

                print('max gap is at zero')
                
            elif max_gap==1 and df.index[i]-PrimeTrue[-1]==1:
                
                print('max gap is 1 and this element is next after positive')
                primeTimes.append(df.index[i])    
            
            elif max_gap>=2:
                
                try:
                    distance=(Prime2[-1]-PrimeTrue[-1]) 
                    distance_to_check=max(max_gap-distance,0)
                    print('last index element in Prime2 list is ',Prime2[-1])

                except:
                    
                    print('Prime2 has not been initiated, first clustering detection')
                    distance=88888
                    distance_to_check=max(max_gap-1,0)
                    
                print('distance is ',distance,' distance to check is ', distance_to_check )
                            
                if distance_to_check>0:

                    primeTimes.append(df.index[i])
                    Prime2.append(df.index[i])
                    distance_checked+=1
                    print('distance checked is ',distance_checked)
                    
                    
                elif distance_to_check==0:
                    
                    distance_checked=1

        elif df.iloc[i] < min_value and len(PrimeTrue)==0:
            
            currentlyPriming=False
            print('element is less than minimum value and element greater than minimum value was not found yet')    
        

        elif df.iloc[i] >= min_value:
            
            PrimeTrue.append(df.index[i])

            if currentlyPriming:

                primeTimes.append(df.index[i])
                print('section d, priming is ',currentlyPriming )
            
            elif not currentlyPriming:

                primeTimes.append(df.index[i])
                currentlyPriming = True

                print('section f, priming is ',currentlyPriming )

        i += 1

    return primeTimes

# %%

if __name__ == "__main__":
    values=[0.05,0,0,0,0,0.037037037,0,0,0,0.035714286,0,0.05,0,0,0,0,0,0,0,0.025677,0,0.05,0,0,0,0.04,0,0.031037037,0,0,0,0,0,0.04,0,0,0,0.074074074,0,0.032258065,0,0,0,0.001,0,0,0,0,0,0,0,0,0,0,0.060606061,0,0,0,0.060606061,0,0,0,0,0,0,0,0,0]


    v1=pd.DataFrame(data=values,index=None,columns=['values'])
    list2=[]

    list2=find_continuous(v1['values'],0.035,2)



    for k in range(len(list2)):
        print(k)
        v1.at[list2[k],'cluster']=list2[k]



    with pd.option_context("display.max_rows", v1.shape[0]):
        display(v1)

Question: is there a better way to write this in Python and how would a highly-skilled Python developer would write this?

Thank you!


Solution

  • I consider that the simplest approach is to form groups of zero signals and count the size of each group and then use conditions to determine the detection value. This avoids looping over the rows and is clearer:

    import pandas as pd
    import numpy  as np
    
    df= pd.DataFrame({'signal': [0, 0.01, 0.036, 0, 0.2, 0, 0, 0, 0, 0.5, 0, 0, 0.1, 0.0, 0.0, 0.0]})
    
    gp = 0
    gp_prev = False
    def func(x):
        global gp, gp_prev
        if x == 0:
            if gp_prev == False:
                gp += 1
            gp_prev = True
            return gp
        else:
            gp_prev = False
            return 0
    
    # use func to map and number groupings of zero and non-zero groups
    df['group'] = df['signal'].map(func)
    
    # collect on the groups, and record their sizes in column gsize
    df['gsize'] = df.groupby('group')['signal'].transform('size')
    
    # mark detection True or False according to type and size of groups
    #note that group 1 (starting zero signal) is treated as a special case
    df['detection'] = np.where(((df['group'] > 0) & (df['gsize'] >2)) | ((df['group'] == 1) & (df.loc[0,'signal'] ==0)), False, True)
    
    # clean up by dropping the temporary columns
    df_final = df.drop(['group','gsize'], axis = 1)
    
    print(df)
    

    which gives (for df so as to show the temporary workings):

        signal  group  gsize  detection
    0    0.000      1      1      False
    1    0.010      0      5       True
    2    0.036      0      5       True
    3    0.000      2      1       True
    4    0.200      0      5       True
    5    0.000      3      4      False
    6    0.000      3      4      False
    7    0.000      3      4      False
    8    0.000      3      4      False
    9    0.500      0      5       True
    10   0.000      4      2       True
    11   0.000      4      2       True
    12   0.100      0      5       True
    13   0.000      5      3      False
    14   0.000      5      3      False
    15   0.000      5      3      False