Search code examples
python-asynciopython-multiprocessingpython-multithreading

await run_in_executor() fails to unblock upon termination of synchronous function


Consider the following simple, self-contained Python3 program:

import asyncio
import multiprocessing
import time
loop = asyncio.get_event_loop()

def sub_process(event, i):
    async def async_function():
        await loop.run_in_executor(None, event.wait)  # <-- problematic line
        print(f"successfully awaited on process {i}!")  # <-- not all processes reach this line!
    loop.run_until_complete(async_function())


if __name__ == '__main__':
    event = multiprocessing.Event()
    processes = [multiprocessing.Process(target=sub_process, args=(event, i)) for i in range(multiprocessing.cpu_count())]
    for process in processes:
        process.start()
    time.sleep(2)
    event.set()
    for process in processes:
        process.join()
    print("success.")

I am getting non-determinstic failures in which some—but not all—processes fail to proceed past the "problematic" line, i.e., even after the event is set. The number of processes which fail to unblock is also non-deterministic (usually between 12-16 on my 16vCPU AWS instance). I am only able to reproduce this on Linux machines (not MacOS).

This seems like an issue with asyncio more so than with multiprocessing. I can tell this because if I replace event.wait with a simple synchronous function which wraps event.wait(), the synchronous function does proceed past the event.wait() call, i.e.:

def sub_process(event, i):
    def sync_function():
        event.wait()
        print("this line is reached...")
    async def async_function():
        await loop.run_in_executor(None, sync_function)
        print("...but this line is not.")
    loop.run_until_complete(async_function())

When exactly one process fails to unblock and I terminate by calling ctrl+C, I get the following stack trace:

Process Process-16:
Traceback (most recent call last):
  File "temp.py", line 21, in <module>
    process.join()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
Traceback (most recent call last):
    pid, sts = os.waitpid(self.pid, flag)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
KeyboardInterrupt
  File "temp.py", line 10, in sub_process
    loop.run_until_complete(async_function())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

Any insight would be greatly appreciated! Thanks.

EDIT: adding multiprocessing.set_start_method('spawn') right underneath if __name__ == '__main__': appears to resolve the issue; thanks @user4815162342. I'd still like to know more about what's going on.


Solution

  • TL;DR Place multiprocessing.set_start_method('spawn') at the top of your script to fix the problem on Linux. The problem could not be reproduced on MacOS because spawn is the default start method there.

    Unix-like operating systems natively offer the fork primitive for parallelism. A call to fork() efficiently duplicates the current process and continues executing both. fork() is still widely used to execute other programs, where a call to fork() is immediately followed by exec(). Nowadays forking is much less used for parallelism, being largely superseded by multi-threading in which the memory is shared by all threads and which doesn't require expensive inter-process communication. However, fork() would appear to be the perfect tool for multiprocessing, which requires multiple separate processes running the same executable (the Python interpreter), exactly what fork provides. The fact that the forked subprocess continues where the parent left off is also a benefit because all the Python modules loaded in the parent are automatically present in the child.

    On Unix-like systems multiprocessing creates worker processes by forking the current process and proceeding to run code that listens on the input queue, ready to execute user-supplied jobs. On Windows, which doesn't provide fork(), it uses a different approach: it creates a worker by executing a whole new python.exe, using command-line arguments and environment variables to instruct it to bootstrap into a multiprocessing worker. The downside of the spawn-based approach is that it's significantly slower to initialize a worker pool because it requires a completely new Python instance for each worker in the pool, which can take a lot of time for processes that import large libraries. It also takes more memory because, unlike fork, which deploys copy-on-write to save on the duplicated memory pages, running a new executable shares almost nothing between the parent and the child. On the other hand, the benefit of spawning is that each worker starts off with a completely clean slate, which turns out to be of crucial importance.

    As it happens, forking has very bad interactions with threading because fork() only duplicates the thread that invoked fork(). Any code that creates a thread won't find it in the child, but its data structure will indicate that the thread is successfully created. This can bite you even if the code you write is not multithreaded - it is enough to use a library that creates helper threads as an implementation detail. Forking also interacts with mutexes: a call to fork() can occur occur while an unrelated thread holds a mutex. When the child process calls into the code that needs the mutex and tries to acquire it, it will deadlock. POSIX tried to provide ways to mitigate these issues, but these workarounds are extremely difficult and often downright impossible to use correctly and consistently. The Apple-provided MacOS system libraries don't even try, so Python devs gave up and just made spawning the default multiprocessing worker start method on MacOS. And as of Python 3.4, multiprocessing.set_start_method can be used to request spawn-based worker creation method on any OS.

    In addition to the interaction with threads, asyncio presents its own set of challenges for fork. An asyncio event loop uses a thread pool for blocking operations used internally, which might be broken in the child. It uses pipes to implement call_soon_threadsafe (and by extension run_in_executor, which uses the same mechanism to alert the event loop when the job is done), as well as asyncio-safe signal handlers. These pipes are inherited by the child process, so its writes to the pipe could end up getting picked up by the parent or possibly even a sibling. It is almost certainly a mistake to try to use the same event loop in a forked child. What might work is to create a new event loop, but odds are that such usage was never systematically tested by the devs and is de facto unsupported.

    If asyncio is to be used in the workers, it is therefore recommended to switch to the spawn or forkserver method for creating multiprocessing workers.