Search code examples
pythonnumpymultiprocessingpython-multiprocessing

Sharing Numpy array variables across processes - Python Multiprocessing


I'm working on some code which builds a spectrogram using the Goerztel method. The calculations are performed mostly using Numpy ndarrays. The final spectrogram is a 2D ndarray(e.g. 1536 x 828) which is built from an initial empty/zero ndarray and then updated with the result(a column vector) of the Goerztel algorithm, which is performed num_windows times.

I have experience with multithreading/parallel processing in other programming languages, C/Java, but am less familiar with it in Python. I have a multiprocess version of the code working but I feel like there is a more elegant/efficient way of doing it. From my understanding of the code and multiprocessing in Python, copies of some of the variables are used in each process(the transformed_cols and coefficients ndarrays), which I think can be avoided.

The reason I think this code is suitable for parallelism is that while writes are occuring to the same ndarray, there's no overlap of which part of the ndarray is being written to.

From reading other similar posts I failed to find one that fit my situation enough to solve my problem, so any help with this would be appreciated. I think the part that could be improved is the apply_async function call, I'm just unsure how :(

For what it's worth, compared to my serial solution, I'm seeing a speed-up of approx 3-3.5x with the below solution (on my machine)

def build_specific_spectrogram(signal: np.ndarray, 
                               sample_rate: int, 
                               filterbank: Filterbank,
                               analysis_window: AnalysisWindow,
                               time_spaces: list,
                               num_windows: int) -> np.ndarray:
    if :
        ## other spectrograms here
    elif filterbank.name == 'goertzel':
        spect = np.zeros((filterbank.num_bands, num_windows), dtype='complex_')
        transformed_cols = build_window_transformed_cols(analysis_window.data, signal, num_windows, analysis_window.window_overlap)

        coefficients = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
        
        num_processes = mp.cpu_count()
        
        def update_spect(result, index):
            spect[:,index] = result
        
        pool = mp.Pool(processes=num_processes)
                    
        for win_index in range(num_windows-1):
            func_callback = partial(update_spect, index=win_index)
            pool.apply_async(build_goertzel_async, [win_index, transformed_cols, coefficients], callback=func_callback)
        pool.close()
        pool.join()
        return spect


def build_goertzel_async(win_index, transformed_cols, coefficients):
    signal_window = transformed_cols[:, win_index]
    window_powers = generalized_goertzel(signal_window, coefficients)
    return window_powers[:,]
           

def build_window_transformed_cols(analysis_window_data: np.ndarray, sample_window: np.ndarray, num_windows: int, window_overlap: float) -> np.ndarray:
    transformed_cols = np.zeros((len(analysis_window_data), num_windows - 1))
    s_index = 0
    e_index = len(analysis_window_data) 
    for win_index in range(num_windows-1):
        windowed_signal = sample_window[s_index:e_index]

        transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
        s_index += window_overlap
        e_index += window_overlap
    return transformed_cols        

    
def generalized_goertzel(signal_window: np.ndarray, 
                         coefficients: np.ndarray) -> np.ndarray:
    signal_length = len(signal_window)
    signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
    num_freqs = len(coefficients)
    powers = np.zeros((num_freqs), dtype = 'complex_')
    for freq_index in range(num_freqs):
        A = 2 * math.pi * (coefficients[freq_index] / signal_length)
        B = math.cos(A) * 2
        C = cmath.exp(A * -1j)
        s_0 = 0
        s_1 = 0
        s_2 = 0
        for i in range(0, signal_length-1):
            s_0 = signal_window[i] + B * s_1 - s_2
            s_2 = s_1
            s_1 = s_0
        s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
        powers[freq_index] = s_0 - s_1 * C
        powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
    return powers

Apologies in advance for not provided code that could be run, but that would require the full codebase which is a bit long for a stackoverflow post.


Solution

  • Just thought to provide an answer that worked. Big thanks to @Aarons comment and his previous post, was a big help.

    The structure has changed slightly but functionality is the same. An improvement was also seen in the execution time. Not as big as between the single process and the original multiprocess implementation, but still a good improvement.

    def goertzel_spectrogram_by_multiprocessing(signal: np.ndarray,
                                                filterbank: Filterbank, 
                                                num_windows: int, 
                                                analysis_window_data: list, 
                                                analysis_window_overlap: float,
                                                sample_rate: int) -> np.ndarray:
        spect_shm = shared_memory.SharedMemory(create=True, size=filterbank.num_bands * num_windows * 16)
        spect = np.ndarray((filterbank.num_bands, num_windows), dtype='complex_', buffer=spect_shm.buf)
            
        transformed_cols_shm = shared_memory.SharedMemory(create=True, size=len(analysis_window_data) * num_windows * 8)
        transformed_cols = np.ndarray((len(analysis_window_data), num_windows), buffer=transformed_cols_shm.buf)
        transformed_cols[:] = build_window_transformed_cols(analysis_window_data, signal, num_windows, analysis_window_overlap)
    
        coefficients_shm = shared_memory.SharedMemory(create=True, size=len(filterbank.band_frequencies) * 8)
        coefficients = np.ndarray((len(filterbank.band_frequencies)), dtype='float64', buffer=coefficients_shm.buf)
        coefficients[:] = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
            
        cpu_count = mp.cpu_count()
        print('cpu_count = %d', cpu_count)
            
        in_q = mp.Queue()
        shm_names = {
            'spect': spect_shm,
            'cols': transformed_cols_shm,
            'coefficients': coefficients_shm
            }
        processes = [mp.Process(target=update_goertzel, args=(in_q, shm_names, spect.shape, transformed_cols.shape, coefficients.shape)) for _ in range(cpu_count)]
        
        for p in processes:
            p.start()
            
        for window_index in range(num_windows):
            in_q.put(window_index)
            
        for _ in processes:
            in_q.put(STOPFLAG())
                
        for p in processes:
            p.join()
            
        spect_copy = np.copy(spect) ## need to copy, since the close and unlink operations destroy the original
          
        spect_shm.close()
        spect_shm.unlink()
        transformed_cols_shm.close()
        transformed_cols_shm.unlink()
        coefficients_shm.close()
        coefficients_shm.unlink()
    
        return spect_copy
    
    
    class STOPFLAG: pass
    
    
    def update_goertzel(in_q, shm_names: dict, spect_shape, cols_shape, coefficients_shape):
        spect_shm = shm_names['spect']
        transformed_cols_shm = shm_names['cols']
        coefficients_shm = shm_names['coefficients']
        spect = np.ndarray(spect_shape, dtype='complex_', buffer=spect_shm.buf)
        transformed_cols = np.ndarray(cols_shape, dtype='float64', buffer=transformed_cols_shm.buf)
        coefficients = np.ndarray(coefficients_shape, dtype='float64', buffer=coefficients_shm.buf)
        while True:
            try:    
                window_index = in_q.get(1)
            except Empty:
                print('Tasks done, exitting')
                break
            if isinstance(window_index, STOPFLAG):
                print('Received STOPFLAG, exitting')
                break
            res = build_goertzel(window_index, transformed_cols, coefficients)
            spect[:,window_index] = res
    
    
    def build_goertzel(win_index, transformed_cols, coefficients):
        signal_window = transformed_cols[:, win_index]
        window_powers = generalized_goertzel(signal_window, coefficients)
        return window_powers[:,]
               
    
    def build_window_transformed_cols(analysis_window_data: np.ndarray,
                                      sample_window: np.ndarray,
                                      num_windows: int,
                                      window_overlap: float) -> np.ndarray:
        transformed_cols = np.zeros((len(analysis_window_data), num_windows ), dtype='float64')
        s_index = 0
        e_index = len(analysis_window_data) 
        for win_index in range(num_windows-1):
            windowed_signal = sample_window[s_index:e_index]
    
            transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
            s_index += window_overlap
            e_index += window_overlap
        return transformed_cols        
    
    
    def generalized_goertzel(signal_window: np.ndarray, 
                             coefficients: np.ndarray) -> np.ndarray:
        signal_length = len(signal_window)
        signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
        num_freqs = len(coefficients)
        powers = np.zeros((num_freqs), dtype = 'complex_')
        for freq_index in range(num_freqs):
            A = 2 * math.pi * (coefficients[freq_index] / signal_length)
            B = math.cos(A) * 2
            C = cmath.exp(A * -1j)
            s_0 = 0
            s_1 = 0
            s_2 = 0
            for i in range(0, signal_length-1):
                s_0 = signal_window[i] + B * s_1 - s_2
                s_2 = s_1
                s_1 = s_0
            s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
            powers[freq_index] = s_0 - s_1 * C
            powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
        return powers