I'm working on some code which builds a spectrogram using the Goerztel method. The calculations are performed mostly using Numpy ndarrays. The final spectrogram is a 2D ndarray(e.g. 1536 x 828) which is built from an initial empty/zero ndarray and then updated with the result(a column vector) of the Goerztel algorithm, which is performed num_windows
times.
I have experience with multithreading/parallel processing in other programming languages, C/Java, but am less familiar with it in Python. I have a multiprocess version of the code working but I feel like there is a more elegant/efficient way of doing it. From my understanding of the code and multiprocessing in Python, copies of some of the variables are used in each process(the transformed_cols
and coefficients
ndarrays), which I think can be avoided.
The reason I think this code is suitable for parallelism is that while writes are occuring to the same ndarray, there's no overlap of which part of the ndarray is being written to.
From reading other similar posts I failed to find one that fit my situation enough to solve my problem, so any help with this would be appreciated. I think the part that could be improved is the apply_async function call, I'm just unsure how :(
For what it's worth, compared to my serial solution, I'm seeing a speed-up of approx 3-3.5x with the below solution (on my machine)
def build_specific_spectrogram(signal: np.ndarray,
sample_rate: int,
filterbank: Filterbank,
analysis_window: AnalysisWindow,
time_spaces: list,
num_windows: int) -> np.ndarray:
if :
## other spectrograms here
elif filterbank.name == 'goertzel':
spect = np.zeros((filterbank.num_bands, num_windows), dtype='complex_')
transformed_cols = build_window_transformed_cols(analysis_window.data, signal, num_windows, analysis_window.window_overlap)
coefficients = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
num_processes = mp.cpu_count()
def update_spect(result, index):
spect[:,index] = result
pool = mp.Pool(processes=num_processes)
for win_index in range(num_windows-1):
func_callback = partial(update_spect, index=win_index)
pool.apply_async(build_goertzel_async, [win_index, transformed_cols, coefficients], callback=func_callback)
pool.close()
pool.join()
return spect
def build_goertzel_async(win_index, transformed_cols, coefficients):
signal_window = transformed_cols[:, win_index]
window_powers = generalized_goertzel(signal_window, coefficients)
return window_powers[:,]
def build_window_transformed_cols(analysis_window_data: np.ndarray, sample_window: np.ndarray, num_windows: int, window_overlap: float) -> np.ndarray:
transformed_cols = np.zeros((len(analysis_window_data), num_windows - 1))
s_index = 0
e_index = len(analysis_window_data)
for win_index in range(num_windows-1):
windowed_signal = sample_window[s_index:e_index]
transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
s_index += window_overlap
e_index += window_overlap
return transformed_cols
def generalized_goertzel(signal_window: np.ndarray,
coefficients: np.ndarray) -> np.ndarray:
signal_length = len(signal_window)
signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
num_freqs = len(coefficients)
powers = np.zeros((num_freqs), dtype = 'complex_')
for freq_index in range(num_freqs):
A = 2 * math.pi * (coefficients[freq_index] / signal_length)
B = math.cos(A) * 2
C = cmath.exp(A * -1j)
s_0 = 0
s_1 = 0
s_2 = 0
for i in range(0, signal_length-1):
s_0 = signal_window[i] + B * s_1 - s_2
s_2 = s_1
s_1 = s_0
s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
powers[freq_index] = s_0 - s_1 * C
powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
return powers
Apologies in advance for not provided code that could be run, but that would require the full codebase which is a bit long for a stackoverflow post.
Just thought to provide an answer that worked. Big thanks to @Aarons comment and his previous post, was a big help.
The structure has changed slightly but functionality is the same. An improvement was also seen in the execution time. Not as big as between the single process and the original multiprocess implementation, but still a good improvement.
def goertzel_spectrogram_by_multiprocessing(signal: np.ndarray,
filterbank: Filterbank,
num_windows: int,
analysis_window_data: list,
analysis_window_overlap: float,
sample_rate: int) -> np.ndarray:
spect_shm = shared_memory.SharedMemory(create=True, size=filterbank.num_bands * num_windows * 16)
spect = np.ndarray((filterbank.num_bands, num_windows), dtype='complex_', buffer=spect_shm.buf)
transformed_cols_shm = shared_memory.SharedMemory(create=True, size=len(analysis_window_data) * num_windows * 8)
transformed_cols = np.ndarray((len(analysis_window_data), num_windows), buffer=transformed_cols_shm.buf)
transformed_cols[:] = build_window_transformed_cols(analysis_window_data, signal, num_windows, analysis_window_overlap)
coefficients_shm = shared_memory.SharedMemory(create=True, size=len(filterbank.band_frequencies) * 8)
coefficients = np.ndarray((len(filterbank.band_frequencies)), dtype='float64', buffer=coefficients_shm.buf)
coefficients[:] = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
cpu_count = mp.cpu_count()
print('cpu_count = %d', cpu_count)
in_q = mp.Queue()
shm_names = {
'spect': spect_shm,
'cols': transformed_cols_shm,
'coefficients': coefficients_shm
}
processes = [mp.Process(target=update_goertzel, args=(in_q, shm_names, spect.shape, transformed_cols.shape, coefficients.shape)) for _ in range(cpu_count)]
for p in processes:
p.start()
for window_index in range(num_windows):
in_q.put(window_index)
for _ in processes:
in_q.put(STOPFLAG())
for p in processes:
p.join()
spect_copy = np.copy(spect) ## need to copy, since the close and unlink operations destroy the original
spect_shm.close()
spect_shm.unlink()
transformed_cols_shm.close()
transformed_cols_shm.unlink()
coefficients_shm.close()
coefficients_shm.unlink()
return spect_copy
class STOPFLAG: pass
def update_goertzel(in_q, shm_names: dict, spect_shape, cols_shape, coefficients_shape):
spect_shm = shm_names['spect']
transformed_cols_shm = shm_names['cols']
coefficients_shm = shm_names['coefficients']
spect = np.ndarray(spect_shape, dtype='complex_', buffer=spect_shm.buf)
transformed_cols = np.ndarray(cols_shape, dtype='float64', buffer=transformed_cols_shm.buf)
coefficients = np.ndarray(coefficients_shape, dtype='float64', buffer=coefficients_shm.buf)
while True:
try:
window_index = in_q.get(1)
except Empty:
print('Tasks done, exitting')
break
if isinstance(window_index, STOPFLAG):
print('Received STOPFLAG, exitting')
break
res = build_goertzel(window_index, transformed_cols, coefficients)
spect[:,window_index] = res
def build_goertzel(win_index, transformed_cols, coefficients):
signal_window = transformed_cols[:, win_index]
window_powers = generalized_goertzel(signal_window, coefficients)
return window_powers[:,]
def build_window_transformed_cols(analysis_window_data: np.ndarray,
sample_window: np.ndarray,
num_windows: int,
window_overlap: float) -> np.ndarray:
transformed_cols = np.zeros((len(analysis_window_data), num_windows ), dtype='float64')
s_index = 0
e_index = len(analysis_window_data)
for win_index in range(num_windows-1):
windowed_signal = sample_window[s_index:e_index]
transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
s_index += window_overlap
e_index += window_overlap
return transformed_cols
def generalized_goertzel(signal_window: np.ndarray,
coefficients: np.ndarray) -> np.ndarray:
signal_length = len(signal_window)
signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
num_freqs = len(coefficients)
powers = np.zeros((num_freqs), dtype = 'complex_')
for freq_index in range(num_freqs):
A = 2 * math.pi * (coefficients[freq_index] / signal_length)
B = math.cos(A) * 2
C = cmath.exp(A * -1j)
s_0 = 0
s_1 = 0
s_2 = 0
for i in range(0, signal_length-1):
s_0 = signal_window[i] + B * s_1 - s_2
s_2 = s_1
s_1 = s_0
s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
powers[freq_index] = s_0 - s_1 * C
powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
return powers