Search code examples
pythonnumpypython-multiprocessingraycharm++

Issues with parallelizing processing of numpy array


I am having an issue with my attempt in speeding up the computation of my program. In the serialized python version of my code, I'm computing the values of a function f(x), which returns a float, for sliding windows of the NumPy array as can be seen below:

a = np.array([i for i in range(1, 10000000)]) # Some data here
N = 100
result = []
for i in range(N, len(a)):
    result.append(f(a[i - N:i]))

Since the NumPy array is really large and f(x) runtime is high, I've tried to apply multiprocessing to speed up my code. Through my research, I found that charm4py might be a great solution and it has a Pool feature, which breaks up an array in chunks and distributes work between spawned processes. I've implemented charm4py's multiprocessing example and then, translated it to my case:

# Split an array into subarrays for sequential processing (takes only 5 seconds)
a = np.array([a[i - N:i] for i in range(N, len(a))])
result = charm.pool.map(f, a, chunksize=512, ncores=-1)
# I'm running this code through "charmrun +p18 example.py"

The issue that I've encountered is that code was running a lot slower, despite being executed on a more powerful instance (18 physical cores vs 6 physical cores).

I've expected to see ~3x improvement, but it didn't happen. While searching for solutions I've learned that there is some overhead for expensive deserialization/spinning up new processes, but I am not sure if this is the case.

I would really appreciate any feedback or suggestions on how one can implement fast parallel processing of a NumPy array (assuming that function f(x) is not vectorized, takes a pretty long time to compute, and internally makes a large number of specific/individual calls that cannot be parallelized)?

Thank you!


Solution

  • It sounds like you're trying to parallelize this operation with either Charm or Ray (it's not clear how you would use both together).

    If you choose to use Ray, and your data is a numpy array, you can take advantage of zero-copy reads to avoid any deserialization overhead.

    You may want to optimize your sliding window function a bit, but it will likely look like this:

    @ray.remote
    def apply_rolling(f, arr, start, end, window_size):
        results_arr = []
        for i in range(start, end - window_size):
            results_arr.append(f(arr[i : i + windows_size])
        return np.array(results_arr)
    

    note that this structure lets us call f multiple times within a single task (aka batching).

    To use our function:

    # Some small setup
    big_arr = np.arange(10000000)
    big_arr_ref = ray.put(big_arr)
    
    batch_size = len(big_arr) // ray.available_resources()["CPU"]
    window_size = 100
    
    # Kick off our tasks
    result_refs = []
    for i in range(0, big_arr, batch_size):
        end_point = min(i + batch_size, len(big_arr))
        ref = apply_rolling.remote(f, big_arr_ref, i, end_point)
        result_refs.append(ref)
    
    
    # Handle the results
    flattened = []
    for section in ray.get(result_refs):
        flattened.extend(section)
    

    I'm sure you'll want to customize this code, but here are 2 important and nice properties that you'll likely want to maintain.

    Batching: We're utilizing batching to avoid starting too many tasks. In any system, parallelizing incurs overhead, so we always want to be careful and make sure we don't start too many tasks. Furthermore, we are calculating batch_size = len(big_arr) // ray.available_resources()["CPU"] to make sure we use exactly the same number of batches as we have CPUs.

    Shared memory: Since Ray's object store supports zero copy reads from numpy arrays, calling ray.get or reading from a numpy array is pretty much free (on a single machine where there are no network costs). There is some overhead in serializing/calling ray.put though, so this approach only calls put (the expensive operation) once, and ray.get (which is implicitly called) many times.

    Tip: Be careful when passing arrays as parameters directly into remote functions. It will call ray.put multiple times, even if you pass the same object.