So I've gotten the code to work, but it's extremely slow and doesn't give the proper full width at half maxima. Is there something I can do to get the FWHM values and speed up the processing without losing post-processing quality?
'''python'''
# -*- coding: utf-8 -*-
"""
Created on Fri Feb 16 11:38:14 2024
@author: tppoirier96
"""
# importing packages
import pandas as pd
import glob
import numpy as np
from scipy.optimize import leastsq
import matplotlib.pyplot as plt
folder_path = 'folder_path'
# Locating folder with spectral data
file_list = glob.glob(folder_path + "/*.txt")
# Taking all text files from folder
main_dataframe = pd.DataFrame(pd.read_table(file_list[0]))
# Compling dataframe for each spectra
#SpectraType = str(input('What kind of spectral data is this?'))
for i in range(0,len(file_list)-1):
# for all files in the glob
data = pd.read_table(file_list[i], sep="\t")
# Read each text file as dataframe
df = pd.DataFrame(data)
# Convert each file to a dataframe
main_dataframe = pd.concat([main_dataframe, df], axis = 1)
# Append each dataframe to larger dataframe
DataDF =
pd.concat([main_dataframe.iloc[67:,0],main_dataframe.iloc[67:,1::2]], axis=1)
# Cutting off first 110 wavenumbers due to substantial noise
# Also removing the wavenumbers for all but the first spectra
# Compling each modified spectra
SumDataDF = DataDF.describe()
# Summary for tracking important numbers
DataNorm = pd.DataFrame()
# Initializing new dataframe
DataLorentz = pd.DataFrame()
# Initializing new dataframe
DataNorm = (DataDF-DataDF.min())/(DataDF.max()-DataDF.min())
# Normalizing data
DataNorm = pd.concat([DataDF.iloc[:,0], DataNorm], axis=1)
# Adding wavenumbers back for visualization
xData = DataNorm.iloc[:,0].array
# Initializing wavenumber array
E2GArray = np.zeros((4,len(file_list)))
# Initializing E2G location array
# Standard Lorentzian function
def norm_lorentzian(x, x0, a, gamma):
return a * gamma**2 / ( gamma**2 + ( x - x0 )**2)
# For multiple peaks
def multi_lorentz(x,params):
off = params[0]
paramsRest = params[1:]
assert not (len(paramsRest )%3)
return off + sum([norm_lorentzian( x, *paramsRest[ i : i+3 ] )
for i in range( 0, len( paramsRest ), 3 ) ] )
# Least squares regression method
def res_multi_lorentz( params, xData, yData ):
diff = [ multi_lorentz( x, params ) - y for x, y in zip( xData,
yData ) ]
return diff
# Begin processing
for i in range(1,len(DataNorm.columns)-1):
yData = DataNorm.iloc[:,i].array
# Converting Spectra intensities to convenient type
#NumPeaks = int(input('How many peaks to fit?\n'))
# Collecting number of peaks to fit
counter = 0
# Break condition
generalWidth = 10
# Starting HWHM
yDataLoc = yData
# Duplicating intensities for convenience
startValues = [max(DataNorm.iloc[:,i])]
# Taking the maximum (should be 1.0)
while max( yDataLoc ) - min( yDataLoc ) > .1:
# I'm not sure about this condition
counter += 1
# Break conditon increment
print(str(counter)+ " while")
if counter > 20:
print("Reached max iterations!")
break
minP = np.argmin( yDataLoc )
# Index of minimum intensity (what wavenumber the baseline
is)
minY = yData[ minP ]
# What the minimum intensity is
x0 = xData[ minP ]
# Seeting the initial E2G position as the minimum
startValues += [ x0, minY - max( yDataLoc ), generalWidth ]
# Appending values to feed to lorentz function
popt, ier = leastsq( res_multi_lorentz, startValues, args=(
xData, yData))
# Returns result of optimization and integer flag for
solution
# popt is a concatonation of final values similar to
startingValues
# The final popts values are fed to the line below for each
spectral intensity set
yDataLoc = [y - multi_lorentz( x, popt ) for x,y in zip(
xData, yData)]
# Appends difference betweeen y-values and lorentzian curves
if 1300<x0<1400:
# If the E2G is found, append it to arrary for data
compilation
E2GArray[0,i-1] = i
# Tracking which data file it came from
E2GArray[2,i] = x0
# Storing location of E2G
E2GArray[3,i] = generalWidth*2
# Storing FWHM for later
print(str(i)+" for")
testData = [multi_lorentz(x,popt) for x in xData]
# Makes array of lorentzian fits
DataLorentz = pd.concat([DataLorentz,pd.Series(testData)] ,
axis=1)
fig = plt.figure()
ax = fig.add_subplot( 1, 1, 1 )
ax.plot( xData, yData, label="Data" )
ax.plot( xData, testData, label="Lorentz")
plt.xlabel("Wavenumber")
plt.ylabel("Intensity relative to E2G")
plt.legend(loc="upper left")
plt.show()
# creating a new csv file with the dataframe we created
# cwd = os.getcwd()
# print(cwd)
#main_dataframe.to_excel('Mapped Data.xlsx')
I would like to be able to process approximately 4000 files hastily. Currently it is projected to take 1 week to process this data. I would like to be able to store the position of the main peak (expected between 1300 and 1400 in the inner-most if statement) as well the FWHM, hence the 4th row of the E2GArray.
There are few challenges to take into account:
Then, automatize for each file.
The following class implement the methodology:
import inspect
import itertools
import pathlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pybaselines import Baseline
from scipy import optimize, stats, signal
class SpectroscopySolver:
@staticmethod
def peak_model(x, loc, scale, height):
"""Lorentzian peak"""
law = stats.cauchy(loc=loc, scale=scale)
return height * law.pdf(x) / law.pdf(loc)
@classmethod
def m(cls):
"""Number of model parameters"""
return len(inspect.signature(cls.peak_model).parameters) - 1
@classmethod
def model(cls, x, *p):
"""Variable peak model"""
m = cls.m()
n = len(p) // m
y = np.zeros_like(x)
for i in range(n):
y += cls.peak_model(x, *p[i * m:(i + 1) * m])
return y
@classmethod
def loss_factory(cls, xdata, ydata):
"""Least Square Factory"""
def wrapped(p):
"""Least Square Loss function"""
return 0.5 * np.sum(np.power(ydata - cls.model(xdata, *p), 2))
return wrapped
@classmethod
def fit(cls, xdata, ydata, prominence=400.):
"""Fitting methodology"""
# Baseline management:
baseline_fitter = Baseline(x_data=xdata)
baseline, params = baseline_fitter.asls(ydata)
yb = ydata - baseline
# Peak identifications:
peaks, bases = signal.find_peaks(yb, prominence=prominence)
lefts, rights = bases["left_bases"], bases["right_bases"]
# Initial guess tuning:
x0 = list(itertools.chain(*[[xdata[i], 0.1, p] for i, p in zip(peaks, bases["prominences"])]))
bounds = list(itertools.chain(*[
[(xdata[lefts[i]], xdata[rights[i]])] + [(0., np.inf)] * (cls.m() - 1) for i in range(len(peaks))
]))
# Least Square Loss Minimization:
loss = cls.loss_factory(xdata, yb)
solution = optimize.minimize(loss, x0=x0, bounds=bounds)
# Estimate:
yhat = cls.model(xdata, *solution.x)
ybhat = yhat + baseline
# Plot:
fig, axe = plt.subplots()
axe.scatter(xdata, ydata, marker=".", color="gray", label="Data")
axe.scatter(xdata[peaks], ydata[peaks], label="Peaks")
axe.plot(xdata, baseline, "--", label="Baseline")
axe.plot(xdata, ybhat, label="Fit")
axe.set_title("Spectrogram")
axe.set_xlabel(r"Wavenumber, $\nu$")
axe.set_ylabel(r"Raw Signal, $g(\nu)$")
axe.legend()
axe.grid()
return axe
Then it suffices to automatize for each file:
files = list(pathlib.Path("raman/").glob("*.txt"))
solver = SpectroscopySolver()
for file in files:
data = pd.read_csv(file, sep="\t", header=None, skiprows=100, names=["x", "y"])
solver.fit(data.x, data.y)
Which performs relatively well with regards to your datasets.
You can control the methodology by: