Search code examples
pythonscipyfilteringsignal-processingreal-time

How to real-time filter with scipy and lfilter?


Disclaimer: I am probably not as good at DSP as I should be and therefore have more issues than I should have getting this code to work.

I need to filter incoming signals as they happen. I tried to make this code to work, but I have not been able to so far. Referencing scipy.signal.lfilter doc

import numpy as np
import scipy.signal
import matplotlib.pyplot as plt
from lib import fnlib

samples = 100
x = np.linspace(0, 7, samples)
y = [] # Unfiltered output
y_filt1 = [] # Real-time filtered

nyq = 0.5 * samples
f1_norm = 0.1 / nyq
f2_norm = 2 / nyq
b, a = scipy.signal.butter(2, [f1_norm, f2_norm], 'band', analog=False)
zi = scipy.signal.lfilter_zi(b,a)
zi = zi*(np.sin(0) + 0.1*np.sin(15*0))

This sets zi as zi*y[0 ] initially, which in this case is 0. I have got it from the example code in the lfilter documentation, but I am not sure if this is correct at all.

Then it comes to the point where I am not sure what to do with the few initial samples. The coefficients a and b are len(a) = 5 here. As lfilter takes input values from now to n-4, do I pad it with zeroes, or do I need to wait until 5 samples have gone by and take them as a single bloc, then continuously sample each next step in the same way?

for i in range(0, len(a)-1): # Append 0 as initial values, wrong?
    y.append(0)

step = 0
for i in xrange(0, samples): #x:
    tmp = np.sin(x[i]) + 0.1*np.sin(15*x[i])
    y.append(tmp)

    # What to do with the inital filterings until len(y) ==  len(a) ?

    if (step> len(a)):
        y_filt, zi = scipy.signal.lfilter(b, a, y[-len(a):], axis=-1, zi=zi)
        y_filt1.append(y_filt[4])

print(len(y))
y = y[4:]
print(len(y))
y_filt2 = scipy.signal.lfilter(b, a, y) # Offline filtered

plt.plot(x, y, x, y_filt1, x, y_filt2)
plt.show()

Solution

  • I think I had the same problem, and found a solution on https://github.com/scipy/scipy/issues/5116:

    from scipy import zeros, signal, random
    
    def filter_sbs():
        data = random.random(2000)
        b = signal.firwin(150, 0.004)
        z = signal.lfilter_zi(b, 1) * data[0]
        result = zeros(data.size)
        for i, x in enumerate(data):
            result[i], z = signal.lfilter(b, 1, [x], zi=z)
        return result
    
    if __name__ == '__main__':
        result = filter_sbs()
    

    The idea is to pass the filter state z in each subsequent call to lfilter. For the first few samples the filter may give strange results, but later (depending on the filter length) it starts to behave correctly.