Search code examples
pythonparallel-processingconcurrent.futures

How do you parallelize access to a shared array in python using concurrent.futures?


I have the following piece of code to illustrate my problem:

Each thread calculates a value locs and then updates the result array, assume that that update (result[locs] += mask[locs] ) is a very slow operation, how can I parallelize it so it can be threaded too?

import numpy as np
import time
import concurrent.futures

MAX  = 100
SIZE = 500
mask = np.random.randint(0, MAX, (SIZE, SIZE))

def process_image(i):
    start = time.time()
    locs = np.where(mask > i)
    print(f"    process_image({i}) took {round(time.time() - start, 2)} secs.")
    return locs

if __name__ == '__main__':

    result = np.zeros((SIZE, SIZE))
     
    with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
            results = [executor.submit(process_image, i) for i in range(MAX) ]
            for f in concurrent.futures.as_completed(results):
                locs = f.result()         
                # How do I parallelize this operation? Where the result of each thread updates a shared result array
                result[locs] += mask[locs] 

    print(result)

Solution

  • You're using multithreading and therefore the results ndarray is naturally available to all threads.

    However, the result[locs] += mask[locs] may not be thread-safe. Therefore you would need to defend the result object using a Lock.

    I assume that your implementation of process_image() does some I/O in reality then you could make the following changes:

    import numpy as np
    import time
    import concurrent.futures
    from threading import Lock
    
    MAX  = 100
    SIZE = 500
    mask = np.random.randint(0, MAX, (SIZE, SIZE))
    LOCK = Lock()
    
    def process_image(i):
        start = time.time()
        locs = np.where(mask > i)
        print(f"    process_image({i}) took {round(time.time() - start, 2)} secs.")
        with LOCK:
             result[locs] += mask[locs]
    
    if __name__ == '__main__':
    
        result = np.zeros((SIZE, SIZE))
         
        with concurrent.futures.ThreadPoolExecutor() as executor:
            executor.map(process_image, range(MAX))
    
        print(result)