Search code examples
pythonalgorithmpandasnumpysmoothing

Algorithm for smoothing a curve strictly above the original


I have an arbitrary input curve, given as numpy array. I want to create a smoothed version of it, similar to a rolling mean, but which is strictly greater than the original and strictly smooth. I could use the rolling mean value but if the input curve has a negative peak, the smoothed version will drop below the original around that peak. I could then simply use the maximum of this and the original but that would introduce non-smooth spots where the transition occurs.

Furthermore, I would like to be able to parameterize the algorithm with a look-ahead and a look-behind for this resulting curve, so that given a large look-ahead and a small look-behind the resulting curve would rather stick to the falling edges, and with a large look-behind and a small look-ahead it would rather be close to rising edges.

I tried using the pandas.Series(a).rolling() facility to get rolling means, rolling maxima, etc., but up to now I found no way to generate a smoothed version of my input which in all cases stays above to input.

I guess there is a way to combine rolling maxima and rolling means somehow to achieve what I want, so here is some code for computing these:

import pandas as pd
import numpy as np

my input curve:

original = np.array([ 5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7 ])

This can be padded left (pre) and right (post) with the edge values as a preparation for any rolling function:

pre = 2
post = 3
padded = np.pad(original, (pre, post), 'edge')

Now we can apply a rolling mean:

smoothed = pd.Series(padded).rolling(
    pre + post + 1).mean().get_values()[pre+post:]

But now the smoothed version is below the original, e. g. at index 4:

print(original[4], smoothed[4])  # 8 and 5.5

To compute a rolling maximum, you can use this:

maximum = pd.Series(padded).rolling(
    pre + post + 1).max().get_values()[pre+post:]

But a rolling maximum alone would of course not be smooth in many cases and would display a lot of flat tops around the peaks of the original. I would prefer a smooth approach to these peaks.

If you have also pyqtgraph installed, you can easily plot such curves:

import pyqtgraph as pg
p = pg.plot(original)
p.plotItem.plot(smoothed, pen=(255,0,0))

(Of course, other plot libraries would do as well.)

What I would like to have as a result is a curve which is e. g. like the one formed by these values:

goal = np.array([ 5, 7, 7.8, 8, 8, 8, 7, 5, 3.5, 3, 4, 5.5, 6.5, 7 ])

Here is an image of the curves. The white line is the original (input), the red the rolling mean, the green is about what I would like to have:

curve plot

EDIT: I just found the functions baseline() and envelope() of a module named peakutils. These two functions can compute polynomials of a given degree fitting the lower resp. upper peaks of the input. For small samples this can be a good solution. I'm looking for something which can also be applied on very large samples with millions of values; then the degree would need to be very high and the computation then also takes a considerate amount of time. Doing it piecewise (section for section) opens up a bunch of new questions and problems (like how to stitch properly while staying smooth and guaranteed above the input, performance when processing a massive amount of pieces etc.), so I'd like to avoid that if possible.

EDIT 2: I have a promising approach by a repetitively applying a filter which creates a rolling mean, shifts it slightly to the left and the right, and then takes the maximum of these two and the original sample. After applying this several times, it smoothes out the curve in the way I wanted it. Some unsmooth spots can remain, though, in deep valleys. Here is the code for this:

pre = 30
post = 30
margin = 10
s = [ np.array(sum([[ x ] * 100 for x in
      [ 5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7 ]], [])) ]
for _ in range(30):
  s.append(np.max([
    pd.Series(np.pad(s[-1], (margin+pre, post), 'edge')).rolling(
      1 + pre + post).mean().get_values()[pre+post:-margin],
    pd.Series(np.pad(s[-1], (pre, post+margin), 'edge')).rolling(
      1 + pre + post).mean().get_values()[pre+post+margin:],
    s[-1]], 0))

This creates 30 iterations of applying the filter, plotting these can be done using pyqtplot so:

p = pg.plot(original)
for q in s:
  p.plotItem.plot(q, pen=(255, 100, 100))

The resulting image looks like this: enter image description here

There are two aspects I don't like about this approach: ① It needs iterating lots of time (which slows me down), ② it still has unsmooth parts in the valleys (although in my usecase this might be acceptable).


Solution

  • I have now played around quite a bit and I think I found two main answers which solve my direct need. I will give them below.

    import numpy as np
    import pandas as pd
    from scipy import signal
    import pyqtgraph as pg
    

    These are just the necessary imports, used in all code blow. pyqtgraph is only used for displaying stuff of course, so you do not really need it.

    Symmetrical Smoothing

    This can be used to create a smooth line which is always above the signal, but it cannot distinguish between rising and falling edges, so the curve around a single peak will look symmetrical. In many cases this might be quite okay and since it is way less complex than the asymmetrical solution below (and also does not have any quirks I would know about).

    s = np.repeat([5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7], 400) + 0.1
    s *= np.random.random(len(s))
    pre = post = 400
    x = pd.Series(np.pad(s, (pre, post), 'edge')).rolling(
        pre + 1 + post).max().get_values()[pre+post:]
    y = pd.Series(np.pad(x, (pre, post), 'edge')).rolling(
        pre + 1 + post, win_type='blackman').mean().get_values()[pre+post:]
    p = pg.plot(s, pen=(100,100,100))
    for c, pen in ((x, (0, 200, 200)),
                   (y, pg.mkPen((255, 255, 255), width=3, style=3))):
        p.plotItem.plot(c, pen=pen)
    

    enter image description here

    • Create a rolling maximum (x, cyan), and
    • create a windowed rolling mean of this (z, white dotted).

    Asymmetrical Smoothing

    My usecase called for a version which allowed to distinguish between rising and falling edges. The speed of the output should be different when falling or when rising.

    Comment: Used as an envelope for a compressor/expander, a quickly rising curve would mean to dampen the effect of a sudden loud noise almost completely, while a slowly rising curve would mean to slowly compress the signal for a long time before the loud noise, keeping the dynamics when the bang appears. On the other hand, if the curve falls quickly after a loud noise, this would make quiet stuff shortly after a bang audible while a slowly falling curve would keep the dynamics there as well and only slowly expanding the signal back to normal levels.

    s = np.repeat([5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7], 400) + 0.1
    s *= np.random.random(len(s))
    pre, post = 100, 1000
    t = pd.Series(np.pad(s, (post, pre), 'edge')).rolling(
        pre + 1 + post).max().get_values()[pre+post:]
    g = signal.get_window('boxcar', pre*2)[pre:]
    g /= g.sum()
    u = np.convolve(np.pad(t, (pre, 0), 'edge'), g)[pre:]
    g = signal.get_window('boxcar', post*2)[:post]
    g /= g.sum()
    v = np.convolve(np.pad(t, (0, post), 'edge'), g)[post:]
    u, v = u[:len(v)], v[:len(u)]
    w = np.min(np.array([ u, v ]),0)
    pre = post = max(100, min(pre, post)*3)
    x = pd.Series(np.pad(w, (pre, post), 'edge')).rolling(
        pre + 1 + post).max().get_values()[pre+post:]
    y = pd.Series(np.pad(x, (pre, post), 'edge')).rolling(
        pre + 1 + post, win_type='blackman').mean().get_values()[pre+post:]
    p = pg.plot(s, pen=(100,100,100))
    for c, pen in ((t, (200, 0, 0)),
                   (u, (200, 200, 0)),
                   (v, (0, 200, 0)),
                   (w, (200, 0, 200)),
                   (x, (0, 200, 200)),
                   (y, pg.mkPen((255, 255, 255), width=3))):
        p.plotItem.plot(c, pen=pen)
    

    enter image description here

    This sequence combines ruthlessly several methods of signal processing.

    • The input signal is shown in grey. It is a noisy version of the input mentioned above.
    • A rolling maximum is applied to this (t, red).
    • Then a specially designed convolution curve for the falling edges is created (g) and the convolution is computed (u, yellow).
    • This is repeated for the rising edges with a different convolution curve (g again) and the convolution is computed (v, green).
    • The minimum of u and v is a curve having the desired slopes but is not very smooth yet; especially it has ugly spikes when the falling and the rising slope reach into each other (w, purple).
    • On this the symmetrical method above is applied:
      • Create a rolling maximum of this curve (x, cyan).
      • Create a windowed rolling mean of this curve (y, white dotted).