Search code examples
pythonscipyfftdft

determining and filtering out low frequent component from a real dataset


I am trying to use FFT to filter out low frequency components from a signal and retain high frequency components in a real dataset (hourly electricity demand in California). I have tried this so far:

X = fft(df['demand'])
y_fft_shift = fftshift(X)
n = 20 # number of low freq components to be removed (selected randomly) 
m = len(X)/2
sig_fft_filtered_img = y_fft_shift.copy()
sig_fft_filtered_img[int(m-n):int(m+n+1)] = 0
y_ifft_shift = ifftshift(sig_fft_filtered_img)
y_ifft = ifft(y_ifft_shift)

# compare original signal vs filtered signal 
plt.figure(figsize = (25, 6))
plt.plot(df['demand'],'b') #df['hour'], 
plt.plot(abs(y_ifft.real),'r')
plt.xlabel('Datetime')
plt.ylabel('demand')
plt.title('original vs filtered signal')
plt.xticks(rotation=25) 
plt.show()

I am not sure whether (a) my implementation is correct, and (b) the results obtained from inverse discrete fourier transform is the expected results. For instance, if I don't take abs(y_ifft.real) I get negative values.

demand original vs filtered

I tried the following two approaches on synthetic signal before applying the second approach to the real dataset.

from scipy.fftpack import fft, ifft, fftfreq
sr = 2000 # sampling rate
ts = 1.0/sr # sampling interval
t = np.arange(0,1,ts)

#generate a signal
freq = 1. 
x = 3*np.sin(2*np.pi*freq*t)

freq = 4.
x += np.sin(2*np.pi*freq*t)

freq = 7.   
x += 0.5* np.sin(2*np.pi*freq*t)

y = fft(x, axis=0) #FT of original signal
freq = fftfreq(len(x), d=1.0/len(x)) #compute freq.
# define the cut-off frequency
cut_off = 4.5

# high-pass filter by assign zeros to the 
# FFT amplitudes where the absolute 
# frequencies smaller than the cut-off 
sig_fft_filtered[np.abs(freq) < cut_off] = 0

# get the filtered signal in time domain
filtered = ifft(sig_fft_filtered)

I compared the output from the above with the below code with a criterion that I want to remove only lowest four frequency component:

y = fft(x, axis=0)
y_fft_shift = fftshift(y)
n = 4
m = len(x)/2
# y_fft_shift[m-n+1:m+n+1] = 0 
sig_fft_filtered_img = y_fft_shift.copy()
sig_fft_filtered_img[int(m-n):int(m+n+1)] = 0
y_ifft_shift = ifftshift(sig_fft_filtered_img)
y_ifft = ifft(y_ifft_shift)

enter image description here

Dataset used: link to electricity demand dataset used above

P.S.: Many answers on SO helped me to understand the concept on image denoising using FFT as well as on synthetic data but couldn't find much on applying FFT on real dataset

Ref: High Pass Filter for image processing in python by using scipy/numpy

How to interpret the output of scipy.fftpack.fft?

How to interpret the results of the Discrete Fourier Transform (FFT) in Python

Understanding FFT output in python


Solution

  • Don't use the complex versions of the FFT; use the real ones.

    import matplotlib
    import numpy as np
    from matplotlib import pyplot as plt
    
    sr = 2_000  # sampling rate
    ts = 1/sr  # sampling interval
    t = np.arange(0, 1, ts)
    
    # generate a signal
    freq = 1
    y = 3*np.sin(2*np.pi*freq*t)
    
    freq = 4
    y += np.sin(2*np.pi*freq*t)
    
    freq = 7
    y += 0.5*np.sin(2*np.pi*freq*t)
    
    fft = np.fft.rfft(y)  # FT of original signal
    f = np.fft.rfftfreq(len(y), d=ts)  # compute freq.
    
    # define the cut-off frequency
    f_cutoff = 4.5
    i_cutoff = round(f_cutoff/sr * len(t))
    
    # high-pass filter by assign zeros to the
    # FFT amplitudes where the absolute
    # frequencies smaller than the cut-off
    fft_filtered = fft.copy()
    fft_filtered[:1 + i_cutoff] = 0
    
    # get the filtered signal in time domain
    y_filtered = np.fft.irfft(fft_filtered)
    
    matplotlib.use('TkAgg')
    
    ax_t: plt.Axes
    ax_f: plt.Axes
    fig, (ax_t, ax_f) = plt.subplots(nrows=2, ncols=1)
    
    ax_t.plot(t, y, label='unfiltered')
    ax_t.plot(t, y_filtered, label='filtered')
    ax_t.set_xlabel('t (s)')
    ax_t.set_ylabel('y')
    ax_t.legend()
    
    ax_f.loglog(f, np.abs(fft), label='unfiltered')
    ax_f.loglog(f, np.abs(fft_filtered), label='filtered')
    ax_f.set_xlabel('f (Hz)')
    ax_f.set_ylabel('|y|')
    ax_f.legend()
    
    plt.show()
    

    plots