Ok so this is a bit convoluted but I have a async class with a lot of async code.
I wish to parallelize a task inside that class and I want to spawn multiple processes to run a blocking task and also within each of this processes I want to create an asyncio
loop to handle various subtasks.
SO I short of managed to do this with a ThreadPollExecutor but when I try to use a ProcessPoolExecutor I get a Can't pickle local object
error.
This is a simplified version of my code that runs with ThreadPoolExecutor. How can this be parallelized with ProcessPoolExecutor?
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class MyClass:
def __init__(self) -> None:
self.event_loop = None
# self.pool_executor = ProcessPoolExecutor(max_workers=8)
self.pool_executor = ThreadPoolExecutor(max_workers=8)
self.words = ["one", "two", "three", "four", "five"]
self.multiplier = int(2)
async def subtask(self, letter: str):
await asyncio.sleep(1)
return letter * self.multiplier
async def task_gatherer(self, subtasks: list):
return await asyncio.gather(*subtasks)
def blocking_task(self, word: str):
time.sleep(1)
subtasks = [self.subtask(letter) for letter in word]
result = asyncio.run(self.task_gatherer(subtasks))
return result
async def master_method(self):
self.event_loop = asyncio.get_running_loop()
master_tasks = [
self.event_loop.run_in_executor(
self.pool_executor,
self.blocking_task,
word,
)
for word in self.words
]
results = await asyncio.gather(*master_tasks)
print(results)
if __name__ == "__main__":
my_class = MyClass()
asyncio.run(my_class.master_method())
This is a very good question. Both the problem and the solution are quite interesting.
The Problem
One difference between multithreading and multiprocessing is how memory is handled. Threads share a memory space. Processes do not (in general, see below).
Objects are passed to a ThreadPoolExecutor simply by reference. There is no need to create new objects.
But a ProcessPoolExecutor lives in a separate memory space. To pass objects to it, the implementation pickles the objects and unpickles them again on the other side. This detail is often important.
Look carefully at the arguments to blocking_task
in the original question. I don't mean word
- I mean the first argument: self
. The one that's always there. We've seen it a million times and hardly even think about it. To execute the function blocking_task
, a value is required for the argument named "self." To run this function in a ProcessPoolExecutor, "self" must get pickled and unpickled. Now look at some of the member objects of "self": there's an event loop and also the executor itself. Neither of which is pickleable. That's the problem.
There is no way we can run that function, as is, in another Process.
Admittedly, the traceback message "Cannot pickle local object" leaves a lot to be desired. So does the documentation. But it actually makes total sense that the program works with a ThreadPool but not with a ProcessPool.
Note: There are mechanisms for sharing ctypes objects between Processes. However, as far as I'm aware, there is no way to share Python objects directly. That's why the pickle/unpickle mechanism is used.
The Solution
Refactor MyClass to separate the data from the multiprocessing framework. I created a second class, MyTask, which can be pickled and unpickled. I moved a few of the functions from MyClass into it. Nothing of importance has been modified from the original listing - just rearranged.
The script runs successfully with both ProcessPoolExecutor and ThreadPoolExecutor.
import asyncio
import time
# from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
# Refactored MyClass to break out MyTask
class MyTask:
def __init__(self):
self.multiplier = 2
async def subtask(self, letter: str):
await asyncio.sleep(1)
return letter * self.multiplier
async def task_gatherer(self, subtasks: list):
return await asyncio.gather(*subtasks)
def blocking_task(self, word: str):
time.sleep(1)
subtasks = [self.subtask(letter) for letter in word]
result = asyncio.run(self.task_gatherer(subtasks))
return result
class MyClass:
def __init__(self):
self.task = MyTask()
self.event_loop: asyncio.AbstractEventLoop = None
self.pool_executor = ProcessPoolExecutor(max_workers=8)
# self.pool_executor = ThreadPoolExecutor(max_workers=8)
self.words = ["one", "two", "three", "four", "five"]
async def master_method(self):
self.event_loop = asyncio.get_running_loop()
master_tasks = [
self.event_loop.run_in_executor(
self.pool_executor,
self.task.blocking_task,
word,
)
for word in self.words
]
results = await asyncio.gather(*master_tasks)
print(results)
if __name__ == "__main__":
my_class = MyClass()
asyncio.run(my_class.master_method())