Search code examples
pythonpandastime-seriestechnical-indicator

Pandas: Zigzag segmentation of data based on local minima-maxima


I have a timeseries data. Generating data

date_rng = pd.date_range('2019-01-01', freq='s', periods=400)
df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)
s = df['data1']

I want to create a zig-zag line connecting between the local maxima and local minima, that satisfies the condition that on the y-axis, |highest - lowest value| of each zig-zag line must exceed a percentage (say 20%) of the distance of the previous zig-zag line, AND a pre-stated value k (say 1.2)

I can find the local extrema using this code:

# Find peaks(max).
peak_indexes = signal.argrelextrema(s.values, np.greater)
peak_indexes = peak_indexes[0]

# Find valleys(min).
valley_indexes = signal.argrelextrema(s.values, np.less)
valley_indexes = valley_indexes[0]
# Merge peaks and valleys data points using pandas.
df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)

# Sort peak and valley datapoints by date.
df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])

but I don't know how to apply the threshold condition to it. Please advise me on how to apply such condition.

Since the data could contain million timestamps, an efficient calculation is highly recommended

For clearer description: enter image description here

Example output, from my data:

 # Instantiate axes.
(fig, ax) = plt.subplots()
# Plot zigzag trendline.
ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                        color='red', label="Zigzag")

# Plot original line.
ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)

# Format time.
ax.xaxis_date()
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

plt.gcf().autofmt_xdate()   # Beautify the x-labels
plt.autoscale(tight=True)

plt.legend(loc='best')
plt.grid(True, linestyle='dashed')

enter image description here

My desired output (something similar to this, the zigzag only connect the significant segments) enter image description here


Solution

  • I have answered to my best understanding of the question. Yet it is not clear to how the variable K influences the filter.

    You want to filter the extrema based on a running condition. I assume that you want to mark all extrema whose relative distance to the last marked extremum is larger than p%. I further assume that you always consider the first element of the timeseries a valid/relevant point.

    I implemented this with the following filter function:

    def filter(values, percentage):
        previous = values[0] 
        mask = [True]
        for value in values[1:]: 
            relative_difference = np.abs(value - previous)/previous
            if relative_difference > percentage:
                previous = value
                mask.append(True)
            else:
                mask.append(False)
        return mask
    

    To run your code, I first import dependencies:

    from scipy import signal
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    

    To make the code reproduceable I fix the random seed:

    np.random.seed(0)
    

    The rest from here is copypasta. Note that I decreased the amount of sample to make the result clear.

    date_rng = pd.date_range('2019-01-01', freq='s', periods=30)
    df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                      columns=['data1', 'data2', 'data3'],
                      index= date_rng)
    s = df['data1']
    # Find peaks(max).
    peak_indexes = signal.argrelextrema(s.values, np.greater)
    peak_indexes = peak_indexes[0]
    # Find valleys(min).
    valley_indexes = signal.argrelextrema(s.values, np.less)
    valley_indexes = valley_indexes[0]
    # Merge peaks and valleys data points using pandas.
    df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
    df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
    df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)
    # Sort peak and valley datapoints by date.
    df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])
    

    Then we use the filter function:

    p = 0.2 # 20% 
    filter_mask = filter(df_peaks_valleys.zigzag_y, p)
    filtered = df_peaks_valleys[filter_mask]
    

    And plot as you did both your previous plot as well as the newly filtered extrema:

     # Instantiate axes.
    (fig, ax) = plt.subplots(figsize=(10,10))
    # Plot zigzag trendline.
    ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                            color='red', label="Extrema")
    # Plot zigzag trendline.
    ax.plot(filtered['date'].values, filtered['zigzag_y'].values, 
                                                            color='blue', label="ZigZag")
    
    # Plot original line.
    ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)
    
    # Format time.
    ax.xaxis_date()
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    
    plt.gcf().autofmt_xdate()   # Beautify the x-labels
    plt.autoscale(tight=True)
    
    plt.legend(loc='best')
    plt.grid(True, linestyle='dashed')
    

    enter image description here

    EDIT:

    If want to both consider the first as well as the last point as valid, then you can adapt the filter function as follows:

    def filter(values, percentage):
        # the first value is always valid
        previous = values[0] 
        mask = [True]
        # evaluate all points from the second to (n-1)th
        for value in values[1:-1]: 
            relative_difference = np.abs(value - previous)/previous
            if relative_difference > percentage:
                previous = value
                mask.append(True)
            else:
                mask.append(False)
        # the last value is always valid
        mask.append(True)
        return mask