Search code examples
pythonasynchronousmultiprocessingpool

How to accumulate results from pool.apply_async call?


I want to make calls to pool.apply_async(func) and accumulate the results as soon as they are available without waiting for each other.


import multiprocessing
import numpy as np

chrNames=['chr1','chr2','chr3']
sims=[1,2,3]



def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):
    signalArray = chrBased_simBased_result[0]
    countArray = chrBased_simBased_result[1]

    accumulatedSignalArray += signalArray
    accumulatedCountArray += countArray


def func(chrName,simNum):
    print('%s %d' %(chrName,simNum))

    result=[]
    signal_array=np.full((10000,), simNum, dtype=float)
    count_array = np.full((10000,), simNum, dtype=int)
    result.append(signal_array)
    result.append(count_array)

    return result


if __name__ == '__main__':

    accumulatedSignalArray = np.zeros((10000,), dtype=float)
    accumulatedCountArray = np.zeros((10000,), dtype=int)

    numofProcesses = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(numofProcesses)

    for chrName in chrNames:
        for simNum in sims:
            result= pool.apply_async(func, (chrName,simNum,))
            accumulate_chrBased_simBased_result(result.get(),accumulatedSignalArray,accumulatedCountArray)

    pool.close()
    pool.join()

    print(accumulatedSignalArray)
    print(accumulatedCountArray)



In this way, each pool.apply_async call waits for other call to end. Is there a way do get rid of this waiting for each other?


Solution

  • You are using result.get() on each iteration, and making the main process wait for the function to be ready in doing so.

    Please find below a working version, with prints showing that accumulation is done when "func" is ready, and adding random sleeps to ensure sizable execution time differences.

    import multiprocessing
    import numpy as np
    from time import time, sleep
    from random import random
    
    chrNames=['chr1','chr2','chr3']
    sims=[1,2,3]
    
    
    
    def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):    
        signalArray = chrBased_simBased_result[0]
        countArray = chrBased_simBased_result[1]
    
        accumulatedSignalArray += signalArray
        accumulatedCountArray += countArray
    
    
    def func(chrName,simNum):
    
        result=[]
        sleep(random()*5)
        signal_array=np.full((10000,), simNum, dtype=float)
        count_array = np.full((10000,), simNum, dtype=int)
        result.append(signal_array)
        result.append(count_array)
        print('%s %d' %(chrName,simNum))
    
        return result
    
    
    if __name__ == '__main__':
    
        accumulatedSignalArray = np.zeros((10000,), dtype=float)
        accumulatedCountArray = np.zeros((10000,), dtype=int)
    
        numofProcesses = multiprocessing.cpu_count()
        pool = multiprocessing.Pool(numofProcesses)
    
        results = []
        for chrName in chrNames:
            for simNum in sims:
                results.append(pool.apply_async(func, (chrName,simNum,)))
    
        for i in results:
            print(i)
    
        while results:
            for r in results[:]:
                if r.ready():
                    print('{} is ready'.format(r))
                    accumulate_chrBased_simBased_result(r.get(),accumulatedSignalArray,accumulatedCountArray)
                    results.remove(r)
    
        pool.close()
        pool.join()
    
        print(accumulatedSignalArray)
        print(accumulatedCountArray)