Search code examples
pythonmatplotlibpyaudiopeak-detection

How do I find, plot, and output the peaks of a live plotted Fast Fourier Transform (FFT) in Python?


I am working with the pyaudio and matplotlib packages for the first time and I am attempting to plot live audio data from microphone input, transform it to frequency domain information, and then output peaks with an input distance. This project is a modification of the three-part guide to build a spectrum analyzer found here.

Currently the code is formatted in a class as I have alternative methods that I am applying to the audio but I am only posting the class with the relevant methods as they don't make reference to each and are self-contained. Another quirk of the program is that it calls upon a local file though it only uses input from the user microphone; this is a leftover from the original functionality of plotting a sound file's intensity while it played and is no longer integral to the code.

import pyaudio
import wave
import struct
import pandas as pd
from scipy.fftpack import fft
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import numpy as np

class Wave:
    def __init__(self, file) -> None:
        self.CHUNK = 1024 * 4
        self.obj = wave.open(file, "r")
        self.callback_output = []
        self.data = self.obj.readframes(self.CHUNK)
        self.rate = 44100

        # Initiate an instance of PyAudio
        self.p = pyaudio.PyAudio()

        # Open a stream with the file specifications
        self.stream = self.p.open(format = pyaudio.paInt16,
                                  channels = self.obj.getnchannels(),
                                  rate = self.rate,
                                  output = True,
                                  input = True,
                                  frames_per_buffer = self.CHUNK)
        
    def fft_plot(self, distance: float):
        x_fft = np.linspace(0, self.rate, self.CHUNK)
        fig, ax = plt.subplots()
        line_fft, = ax.semilogx(x_fft, np.random.rand(self.CHUNK), "-", lw = 2)

        # Bind plot window sizes
        ax.set_xlim(20, self.rate / 2)

        plot_data = self.stream.read(self.CHUNK)
        self.data_int = pd.DataFrame(struct.unpack(\
                        str(self.CHUNK * 2) + 'h', plot_data)).astype(dtype = "b")[::2]

        y_fft = fft(self.data_int)
        line_fft.set_ydata(np.abs(y_fft[0:self.CHUNK]) / (256 * self.CHUNK))

        plt.show(block = False)

        while True:
            # Read incoming audio data
            data = self.stream.read(self.CHUNK)
            
            # Convert data to bits then to array
            self.data_int = struct.unpack(str(4 * self.CHUNK) + 'B', data)
            
            # Recompute FFT and update line
            yf = fft(self.data_int)
            line_data = np.abs(yf[0:self.CHUNK])  / (128 * self.CHUNK)
            line_fft.set_ydata(line_data)

            # Find all values above threshold
            peaks, _ = find_peaks(line_data, distance = distance)

            # Update the plot
            plt.plot(peaks, line_data[peaks], "x")
            fig.canvas.draw()
            fig.canvas.flush_events()

            # Exit program when plot window is closed
            fig.canvas.mpl_connect('close_event', exit)

test_file = "C:/Users/Tam/Documents/VScode/Final Project/PrismGuitars.wav"

audio_test = Wave(test_file)
audio_test.fft_plot(2000)

The code does not throw any errors and runs fine with an okay framerate and only terminates when the plot window is closed, all of which is good. The issue I'm encountering is with the determination and plotting of the peaks of line_data as when I run this code the output over time looks like this matplotlib graph instance.

It seems that the peaks (or peak) are being found but at a lower frequency than the x of line_data and as such are shifted comparatively. The other, more minor, issue is that since this is a live plot I would like to clear the previous instance of the peak marker so that it only shows the current instance and not all of the ones plotted prior.

I have attempted in prior fixes to use the line_fft in the peak detection but as it is cast to a Line2D format the peak detection algorithm isn't able to deal with the data type. I have also tried implementing a list comprehension as seen in this post but the time to cast to list is prohibitively slow and did not return any peak markers when I ran it.

EDIT: Following Jody's input the program now returns the proper values as I was only printing an index for the x-coordinate of the peak marker. Nevertheless I would still appreciate some insight as to whether it is possible to update per marker rather than having all the previous ones constantly displayed.

As for the marker updating I have attempted to clear the plot in the while loop both before and after drawing the markers (in different tests of course) but I only ever end up with a completely blank graph.

Please let me know if there is anything I should clarify and thank you for your time.


Solution

  • As Jody pointed out the peaks variable contains indexes for the detected peaks that then need to be retrieved from x_fft and line_data in order to match up with the displayed data.

    First we create a scatter plot:

    scat = ax.scatter([], [], c = "purple", marker = "x")
    

    This data can then be stacked using a container variable in the while loop as such:

    array_peaks = np.c_[x_fft[peaks], line_data[peaks]]
    

    and update the data in the while loop with:

    scat.set_offsets(array_peaks)