I'm trying to write some boilerplate code to simplify a common workflow:
Given some function f
that takes in arguments, and a list of these arguments, return a new function that creates the queues and processes, passes off work, and finally gathers the results from a queue into a list
I realize that this is quite similar to what map
does, but this should be more flexible (although irrelevant for the MWE.
The issue is that pickling the decorated function does not work, even after I used functools.wraps
.
When I tested by passing in the "queued" version of the function, rather than creating it within process_wrapper
, the code ran without issue. Thus, I think the issue is that the function f
is being "double decorated", but I don't know how to address the issue.
import multiprocessing as mp
from multiprocessing.queues import Queue
from typing import Callable
from functools import wraps
def func_wrapper(f: Callable) -> Callable:
@wraps(f)
def q_f(q_in, q_out):
while not q_in.empty():
args = q_in.get()
out = f(*args)
q_out.put(out)
return
return q_f
def process_wrapper(f: Callable, num_process: int = 10) -> Callable:
f = func_wrapper(f)
@wraps(f)
def ff(ls) -> list:
manager = mp.Manager()
q_in = manager.Queue()
q_out = manager.Queue()
[q_in.put(x) for x in ls]
# queue_func = f
processes = []
for _ in range(num_process):
p = mp.Process(target = f, args = (q_in, q_out))
processes.append(p)
p.start()
for p in processes:
p.join()
output = []
while not q_out.empty():
output.append(q_out.get())
return output
return ff
if __name__ == "__main__":
def f(x):
print(x)
f = process_wrapper(f)
list_args = [(x,) for x in range(100)]
f(list_args)
I ended up using multiprocess
rather than multiprocessing
and it resolved the issues. It has something to due with the limitations of pickle
versus dill
for serialization