Search code examples
pythonpython-multiprocessingpython-decorators

Pickling doubly-decorated functions for Multiprocessing


I'm trying to write some boilerplate code to simplify a common workflow: Given some function f that takes in arguments, and a list of these arguments, return a new function that creates the queues and processes, passes off work, and finally gathers the results from a queue into a list

I realize that this is quite similar to what map does, but this should be more flexible (although irrelevant for the MWE.

The issue is that pickling the decorated function does not work, even after I used functools.wraps.

When I tested by passing in the "queued" version of the function, rather than creating it within process_wrapper, the code ran without issue. Thus, I think the issue is that the function f is being "double decorated", but I don't know how to address the issue.

import multiprocessing as mp
from multiprocessing.queues import Queue
from typing import Callable
from functools import wraps

def func_wrapper(f: Callable) -> Callable:
    @wraps(f)
    def q_f(q_in, q_out):
        while not q_in.empty():
            args = q_in.get()
            out = f(*args)
            q_out.put(out)
        return
    return q_f

def process_wrapper(f: Callable, num_process: int = 10) -> Callable:

    f = func_wrapper(f)
    @wraps(f)
    def ff(ls) -> list:
        manager = mp.Manager()
        q_in = manager.Queue()
        q_out = manager.Queue()

        [q_in.put(x) for x in ls]

        # queue_func = f

        processes = []
        for _ in range(num_process):
            p = mp.Process(target = f, args = (q_in, q_out))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        output = []
        while not q_out.empty():
            output.append(q_out.get())

        return output
    return ff

if __name__ == "__main__":

    def f(x):
        print(x)
    f = process_wrapper(f)
        

    list_args = [(x,) for x in range(100)]

    f(list_args)

Solution

  • I ended up using multiprocess rather than multiprocessing and it resolved the issues. It has something to due with the limitations of pickle versus dill for serialization