Search code examples
pythonparallel-processingpool

Cannot use multiprocessing.Pool for recursive function running in parallel


I want to compute a sum of all elements in an array in parallel and recursively, I use multiprocessing to implement my idea.

recursive tree illustration

Only 3 processes are allowed to execute at the same time on this computer, the operations are in the same color rectangles execute simultaneously in different processes.

And this is my code(eliminated unrelated parts):

import multiprocessing as mp
import random

def randList(len):
    l = []
    for _ in range(len):
        l.append(random.randint(1, 100))
    return l

def init_pool(sharedArr_):
    global sharedArr
    sharedArr = sharedArr_


class ParallelSum:
    def solve(self, l, r):
        if l >= r:
            return sharedArr[l]

        m = l + (r - l) // 2

        sumL = self.pool.apply_async(self.solve, args=(l, m)) # compute left part of the array
        sumR = self.pool.apply_async(self.solve, args=(m + 1, r)) # compute right part of the array

        return sumL.get() + sumR.get()

    def computeSum(self, a, l, r):
        sharedArr = mp.Array("i", a)

        self.pool = mp.Pool(initializer=init_pool, initargs=(sharedArr,))

        result = self.pool.apply_async(self.solve, args=(l, r))

        print(result.get()) # Error occurs here
        self.pool.close()
        self.pool.join()


if __name__ == "__main__":
    a = randList(int(5000000)) # generating a list of random integers

    parallel_sum = ParallelSum()

    res = parallel_sum.computeSum(a, 0, len(a) - 1)

    print(res)

And I run into this error message: "pool objects cannot be passed between processes or pickled"

Here is the traceback:

  File "...\parallel_sum.py", line 74, in <module>
    res = parallel_sum.computeSum(a, 0, len(a) - 1)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\parallel_sum.py", line 50, in computeSum
    print(result.get())
          ^^^^^^^^^^^^
  File "...\miniconda3\Lib\multiprocessing\pool.py", line 774, in get
    raise self._value
  File "...\miniconda3\Lib\multiprocessing\pool.py", line 540, in _handle_tasks
    put(task)
  File "...\miniconda3\Lib\multiprocessing\connection.py", line 205, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\miniconda3\Lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "...\miniconda3\Lib\multiprocessing\pool.py", line 643, in __reduce__
    raise NotImplementedError(
NotImplementedError: pool objects cannot be passed between processes or pickled

I guess it is not possible to recursively fork a new process.

How can I get put on the right track? I am relatively new with parallel programming and this python library.


Solution

  • The error is telling you what is wrong, it's just a little jargony. In order to serialize ("pickle") self.solve to send it to the worker processes, it has to serialize self. To serialize self, it has to serialize the attributes of self, including self.pool, which is a multiprocessing.Pool, that is unable to be serialized (workers in a pool cannot receive ownership of the pool in order to dispatch tasks to themselves).

    Recursive code isn't super well supported in Python to start with (it can't optimize tail recursion to avoid unbounded stack growth), and it's extra ugly trying to make it work with a multiprocessing.Pool. Don't do it this way.