I have a multi-threaded program, and I want to let the user choose how to run them, either in serial, multi threads or multi cores, at least at the top level. A runnable demo is shown below illustrating my program’s logic.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from queue import Queue
# from multiprocessing import Queue
class Handler:
def __init__(self):
self.queue = Queue() # this object is essential
def put(self, item):
self.queue.put(item)
def run(self):
while True:
item = self.queue.get()
# do other things on the item ...
print(item)
class Runner:
def __init__(self, name):
self.name = name
self.a = Handler()
self.b = Handler()
def start(self):
self.a.put(f'{self.name}: hello a')
self.b.put(f'{self.name}: hello b')
with ThreadPoolExecutor() as exe:
futures = [exe.submit(r.run) for r in [self.a, self.b]]
for future in futures:
future.result()
# current implementation
def run_in_multi_thread():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
with ThreadPoolExecutor() as exe:
futures = [exe.submit(r.start) for r in [rA, rB, rC]]
for future in futures:
future.result()
# how to implement this?
def run_in_multi_process():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
with ProcessPoolExecutor() as exe:
futures = [exe.submit(r.start) for r in [rA, rB, rC]]
for future in futures:
future.result()
if __name__ == '__main__':
# run_in_multi_thread() # this is currently running fine
run_in_multi_process() # how to make this work as well?
My goal is simple, I want to put many Runner
s into separate processes to run in true parallel.
Problem is, when I try to change the outermost ThreadPoolExecutor
to ProcessPoolExecutor
, python always raises TypeError: cannot pickle '_thread.lock' object
.
After googling, I know that this is because I used queue.Queue
in all my Handlers, which uses threading.Lock
, which is a non-pickleable class. However, I cannot avoid using them because the core functionalities are all supported by queue.Queue
and threading.Event
for all Handlers to communicate.
I have also tried to replace queue.Queue
with multiprocessing.Queue
, but this time it raises RuntimeError: Queue objects should only be shared between processes through inheritance
. I also heard of third party libs, such as dill
or pathos
, but it causes other pickling issues, so I end up sticking to the built-in libs.
Any suggestion on how to refactor my code is welcomed.
After much trial and error, I finally came up with a working solution. I think I can also answer my own question. I have given up on using ProcessPoolExecutor
. Right now I use multiprocessing.Process
directly. Below is the revised demo code describing how this can work in my use case.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Process
from queue import Queue
from time import sleep
class Handler:
def __init__(self):
self.queue = Queue() # this object is essential
def put(self, item):
self.queue.put(item)
def run(self):
while True:
item = self.queue.get()
if item == 'exit':
break
# do other things on the item ...
print(item)
sleep(1)
class Runner:
def __init__(self, name):
self.name = name
self.a = Handler()
self.b = Handler()
def start(self):
# some dummy messages
for _ in range(3):
self.a.put(f'{self.name}: hello a')
self.b.put(f'{self.name}: hello b')
# request to shutdown gracefully
self.a.put('exit')
self.b.put('exit')
with ThreadPoolExecutor() as exe:
futures = [exe.submit(r.run) for r in [self.a, self.b]]
for f in futures:
f.result()
# this requires everything to be picklable
def run_in_process_pool():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
with ProcessPoolExecutor() as exe:
futures = [exe.submit(r.start) for r in [rA, rB, rC]]
for future in futures:
future.result()
# this does not pickle anything, but why?
def run_in_processes():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
procs = [Process(target=r.start) for r in [rA, rB, rC]]
for p in procs:
p.start()
for p in procs:
p.join()
if __name__ == '__main__':
# run_in_process_pool() # `TypeError: cannot pickle '_thread.lock' object`
run_in_processes() # this is working
Surprisingly, this code won’t complain about classes not being pickleable anymore! Ends up I just use the old fashion way (i.e. create process -> start -> join). Total run time reduces from 4:39 to 1:05 (my cpu has 4 cores and true parallel run time reduces to around 1/4 makes perfect sense). The next time if you have some multi-threaded code, using queue.Queue
or threading.Lock
under the hood, you may consider wrapping them using multiprocessing.Process
directly.
However I still want to know why only ProcessPoolExecutor
requires everything to be pickleable while Process does not. To my knowledge, the Process will make a system call to create a new process, and the implementation is dependent on the os. Since I am on a Windows machine running WSL2, ,so I guess I am on Unix, and the default start method should be ‘fork’, and it just clones the whole process without the need to pickle anything?