Search code examples
pythondataframevisualizationcurve-fitting

Fitting Variable Number of Lorentzian Peaks to a glob of data in text files


So I've gotten the code to work, but it's extremely slow and doesn't give the proper full width at half maxima. Is there something I can do to get the FWHM values and speed up the processing without losing post-processing quality?

    '''python'''
    # -*- coding: utf-8 -*-
    """
    Created on Fri Feb 16 11:38:14 2024
    
    @author: tppoirier96
    """
    
    # importing packages 
    import pandas as pd 
    import glob
    import numpy as np
    from scipy.optimize import leastsq
    import matplotlib.pyplot as plt
    
    folder_path = 'folder_path'
    # Locating folder with spectral data
    file_list = glob.glob(folder_path + "/*.txt")
    # Taking all text files from folder
    main_dataframe = pd.DataFrame(pd.read_table(file_list[0]))
    # Compling dataframe for each spectra
    
    #SpectraType = str(input('What kind of spectral data is this?'))
    
    for i in range(0,len(file_list)-1):
        # for all files in the glob
        data = pd.read_table(file_list[i], sep="\t")
        # Read each text file as dataframe
        df = pd.DataFrame(data)
        # Convert each file to a dataframe
        main_dataframe = pd.concat([main_dataframe, df], axis = 1)
        # Append each dataframe to larger dataframe
    
    DataDF = 
   pd.concat([main_dataframe.iloc[67:,0],main_dataframe.iloc[67:,1::2]], axis=1)
    # Cutting off first 110 wavenumbers due to substantial noise
    # Also removing the wavenumbers for all but the first spectra
    # Compling each modified spectra
    SumDataDF = DataDF.describe()
    # Summary for tracking important numbers
    DataNorm = pd.DataFrame()
    # Initializing new dataframe
    DataLorentz = pd.DataFrame()
    # Initializing new dataframe
    
    
    DataNorm = (DataDF-DataDF.min())/(DataDF.max()-DataDF.min())
    # Normalizing data
    DataNorm = pd.concat([DataDF.iloc[:,0], DataNorm], axis=1)
    # Adding wavenumbers back for visualization
    
    xData = DataNorm.iloc[:,0].array
    # Initializing wavenumber array
    E2GArray = np.zeros((4,len(file_list)))
    # Initializing E2G location array
    
    # Standard Lorentzian function
    def norm_lorentzian(x, x0, a, gamma):
        return a * gamma**2 / ( gamma**2 + ( x - x0 )**2)
    
    # For multiple peaks
    def multi_lorentz(x,params):
        off = params[0]
        paramsRest = params[1:]
        assert not (len(paramsRest )%3)
        return off + sum([norm_lorentzian( x, *paramsRest[ i : i+3 ] ) 
    for i in range( 0, len( paramsRest ), 3 ) ] )
    
    # Least squares regression method
    def res_multi_lorentz( params, xData, yData ):
        diff = [ multi_lorentz( x, params ) - y for x, y in zip( xData, 
    yData ) ]
        return diff
    
    # Begin processing
    for i in range(1,len(DataNorm.columns)-1):
        yData = DataNorm.iloc[:,i].array
        # Converting Spectra intensities to convenient type
        #NumPeaks = int(input('How many peaks to fit?\n'))
        # Collecting number of peaks to fit
        counter = 0
        # Break condition
        generalWidth = 10
        # Starting HWHM
        yDataLoc = yData
        # Duplicating intensities for convenience
        startValues = [max(DataNorm.iloc[:,i])]
        # Taking the maximum (should be 1.0)
        while max( yDataLoc ) - min( yDataLoc ) > .1:
        # I'm not sure about this condition
            counter += 1
            # Break conditon increment
            print(str(counter)+ " while")
            if counter > 20:
                print("Reached max iterations!")
                break
            minP = np.argmin( yDataLoc )
            # Index of minimum intensity (what wavenumber the baseline 
    is)
            minY = yData[ minP ]
            # What the minimum intensity is
            x0 = xData[ minP ]
            # Seeting the initial E2G position as the minimum
            startValues += [ x0, minY - max( yDataLoc ), generalWidth ]
            # Appending values to feed to lorentz function
            popt, ier = leastsq( res_multi_lorentz, startValues, args=( 
    xData, yData))
            # Returns result of optimization and integer flag for 
    solution
            # popt is a concatonation of final values similar to 
    startingValues
            # The final popts values are fed to the line below for each 
    spectral intensity set
            yDataLoc = [y - multi_lorentz( x, popt ) for x,y in zip( 
    xData, yData)]
            # Appends difference betweeen y-values and lorentzian curves
            if 1300<x0<1400:
            # If the E2G is found, append it to arrary for data 
    compilation
                E2GArray[0,i-1] = i
                # Tracking which data file it came from
                E2GArray[2,i] = x0
                # Storing location of E2G
                E2GArray[3,i] = generalWidth*2
                # Storing FWHM for later
        print(str(i)+" for")
        testData = [multi_lorentz(x,popt) for x in xData]
        # Makes array of lorentzian fits
        DataLorentz = pd.concat([DataLorentz,pd.Series(testData)] , 
    axis=1)
    
        fig = plt.figure()
        ax = fig.add_subplot( 1, 1, 1 )
        ax.plot( xData, yData, label="Data" )
        ax.plot( xData, testData, label="Lorentz")
        plt.xlabel("Wavenumber")
        plt.ylabel("Intensity relative to E2G")
        plt.legend(loc="upper left")
        plt.show()
    
    # creating a new csv file with the dataframe we created
    # cwd = os.getcwd()
    # print(cwd)
    #main_dataframe.to_excel('Mapped Data.xlsx')

I would like to be able to process approximately 4000 files hastily. Currently it is projected to take 1 week to process this data. I would like to be able to store the position of the main peak (expected between 1300 and 1400 in the inner-most if statement) as well the FWHM, hence the 4th row of the E2GArray.


Solution

  • Methodology

    There are few challenges to take into account:

    • Baseline management (remove background to flatten baseline);
    • Peak identification (discover where peaks are located);
    • Peak regression (regress w/ variable peaks model and educated guess).

    Then, automatize for each file.

    MCVE

    The following class implement the methodology:

    import inspect
    import itertools
    import pathlib
    
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    
    from pybaselines import Baseline
    from scipy import optimize, stats, signal
        
    class SpectroscopySolver:
    
        @staticmethod
        def peak_model(x, loc, scale, height):
            """Lorentzian peak"""
            law = stats.cauchy(loc=loc, scale=scale)
            return height * law.pdf(x) / law.pdf(loc)
    
        @classmethod
        def m(cls):
            """Number of model parameters"""
            return len(inspect.signature(cls.peak_model).parameters) - 1
    
        @classmethod
        def model(cls, x, *p):
            """Variable peak model"""
            m = cls.m()
            n = len(p) // m
            y = np.zeros_like(x)
            for i in range(n):
                y += cls.peak_model(x, *p[i * m:(i + 1) * m])
            return y
    
        @classmethod
        def loss_factory(cls, xdata, ydata):
            """Least Square Factory"""
            def wrapped(p):
                """Least Square Loss function"""
                return 0.5 * np.sum(np.power(ydata - cls.model(xdata, *p), 2))
            return wrapped
    
        @classmethod
        def fit(cls, xdata, ydata, prominence=400.):
            """Fitting methodology"""
    
            # Baseline management:
            baseline_fitter = Baseline(x_data=xdata)
            baseline, params = baseline_fitter.asls(ydata)
            yb = ydata - baseline
    
            # Peak identifications:
            peaks, bases = signal.find_peaks(yb, prominence=prominence)
            lefts, rights = bases["left_bases"], bases["right_bases"]
            
            # Initial guess tuning:
            x0 = list(itertools.chain(*[[xdata[i], 0.1, p] for i, p in zip(peaks, bases["prominences"])]))
            bounds = list(itertools.chain(*[
                [(xdata[lefts[i]], xdata[rights[i]])] + [(0., np.inf)] * (cls.m() - 1) for i in range(len(peaks))
            ]))
    
            # Least Square Loss Minimization:
            loss = cls.loss_factory(xdata, yb)
            solution = optimize.minimize(loss, x0=x0, bounds=bounds)
    
            # Estimate:
            yhat = cls.model(xdata, *solution.x)
            ybhat = yhat + baseline
            
            # Plot:
            fig, axe = plt.subplots()
            axe.scatter(xdata, ydata, marker=".", color="gray", label="Data")
            axe.scatter(xdata[peaks], ydata[peaks], label="Peaks")
            axe.plot(xdata, baseline, "--", label="Baseline")
            axe.plot(xdata, ybhat, label="Fit")
            axe.set_title("Spectrogram")
            axe.set_xlabel(r"Wavenumber, $\nu$")
            axe.set_ylabel(r"Raw Signal, $g(\nu)$")
            axe.legend()
            axe.grid()
            
            return axe
    

    Then it suffices to automatize for each file:

    files = list(pathlib.Path("raman/").glob("*.txt"))
    solver = SpectroscopySolver()
    
    for file in files:
        
        data = pd.read_csv(file, sep="\t", header=None, skiprows=100, names=["x", "y"])
        solver.fit(data.x, data.y)
    

    Which performs relatively well with regards to your datasets.

    enter image description here enter image description here enter image description here

    Tuning

    You can control the methodology by:

    • Choosing the appropriate baseline fitter (see pybaseline);
    • Adapting educated initial guess to better drive the gradient descent;
    • Refine bounds to enforce better convergence;
    • Filtering peaks by prominence;
    • Add a minus sign to the peak model if you are about fitting downward peaks;