Search code examples
pythonmultiprocessingpicklepython-multiprocessing

How to start and stop multiple child processes from a class?


The Python program:

import multiprocessing
import time


class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker)
            for _ in range(multiprocessing.cpu_count())]

    def _worker(self):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()


if __name__ == '__main__':
    application = Application()
    application.start()
    time.sleep(3)
    application.stop()

Its output:

starting
starting
Traceback (most recent call last):
  File "/Users/maggyero/Desktop/application.py", line 31, in <module>
    application.start()
  File "/Users/maggyero/Desktop/application.py", line 21, in start
    process.start()
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

In the function Application.__init__, each call multiprocessing.Process(target=self._worker) initializes a multiprocessing.Process instance with the instance method self._worker as its target argument. self._worker is bound to self which has the instance attribute self._processes.

In the function Application.start, each call process.start() serialises the target argument and therefore self._processes. self._processes is a list of multiprocessing.Process instances, initially not started yet. The first call process.start() starts the first multiprocessing.Process instance in that list without issue, but the second call process.start() fails.

So a started multiprocessing.Process instance cannot be serialised. How to solve that problem?


Solution

  • The root of the problem is that the start method of a multiprocessing.Process instance sets its _popen instance attribute to a multiprocessing.popen_*.Popen instance. The initialization of that instance performs these two steps (among others):

    1. For a multiprocessing.popen_spawn_posix.Popen instance, a multiprocessing.popen_spawn_win32.Popen instance, or a multiprocessing.popen_forkserver.Popen instance but not a multiprocessing.popen_fork.Popen instance (i.e. for the start method 'spawn' or the start method 'forkserver' but not the start method 'fork'), it serialises the multiprocessing.Process instance for writing it to the end of the pipe used by the parent process to communicate with the child process so that the child process can execute the run method of the multiprocessing.Process instance.

    2. It sets its finalizer instance attribute to a multiprocessing.util.Finalize instance which itself sets its _weakref instance attribute to a weakref.ref instance for closing at interpreter exit the ends of the pipes used by the parent process to communicate with the child process. In other words, it makes the multiprocessing.Process instance hold a weak reference.

    Thus if a multiprocessing.Process instance holds a reference to a started multiprocessing.Process instance then it holds a weak reference (point 2), so starting it will fail since it will serialise (point 1) the weak reference and weak references are not serialisable:

    import multiprocessing
    
    if __name__ == '__main__':
        multiprocessing.set_start_method('spawn')  # or 'forkserver' but not 'fork'
        process_a = multiprocessing.Process()
        process_b = multiprocessing.Process()
        process_b.foo = process_a
        process_a.start()  # creates process_a._popen.finalizer._weakref
        process_b.start()  # TypeError: cannot pickle 'weakref' object
    

    A minimal Python program showing the serialisation issue:

    import pickle
    import weakref
    
    pickle.dumps(weakref.ref(int))  # TypeError: cannot pickle 'weakref' object
    

    Two workarounds that avoid the serialisation issue:

    • either make the target argument a classmethod so that it is not bound to self (and in particular to the instance attribute self._processes):
    class Application:
    
        def __init__(self):
            self._event = multiprocessing.Event()
            self._processes = [
                multiprocessing.Process(target=self._worker, args=(self._event,))
                for _ in range(multiprocessing.cpu_count())]
    
        @classmethod
        def _worker(self, event):
            while not self._event.is_set():
                print(multiprocessing.current_process().name)
                time.sleep(1)
    
        def start(self):
            for process in self._processes:
                print('starting')
                process.start()
    
        def stop(self):
            self._event.set()
            for process in self._processes:
                process.join()
    
    • or exclude specifically the instance attribute self._processes from the serialisation of the target argument with __getstate__:
    class Application:
    
        def __init__(self):
            self._event = multiprocessing.Event()
            self._processes = [
                multiprocessing.Process(target=self._worker)
                for _ in range(multiprocessing.cpu_count())]
    
        def _worker(self):
            while not self._event.is_set():
                print(multiprocessing.current_process().name)
                time.sleep(1)
    
        def start(self):
            for process in self._processes:
                print('starting')
                process.start()
    
        def stop(self):
            self._event.set()
            for process in self._processes:
                process.join()
    
        def __getstate__(self):
            state = {}
            for key, value in vars(self).items():
                if key != '_processes':
                    state[key] = value
            return state