Search code examples
pythonmultiprocessing

Workaround for multiprocessing with local functions


I am porting over a library for a client who is very picky about external dependencies.

The majority of the multiprocessing in this library is supported by the pathos ProcessPool module. The main reason being that it can very easily deal with locally defined functions.

I'm trying to get some of this functionality back without forcing this dependence (or having to rewrite large chunks of the library). I understand that the following code works because the function is defined at the top level:

import multiprocessing as mp


def f(x):
    return x * x


def main():
    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))


if __name__ == "__main__":
    main()

The following code (which is what I need to get working) fails as the function is only defined in the local scope:

import multiprocessing as mp


def main():
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))


if __name__ == "__main__":
    main()

In my case (not the toy example above) the objects in my generator are unmarshallable.

What would be a good workaround for this specific use case which doesn't require external dependencies?

  • There is a workaround that uses fork but this is unsafe for Mac and Windows (thanks @Monica and @user2357112).
  • @amsh provided a workaround which seems to work for any function + generator. While a great option, the downside is it that it requires the function be defined at the global scope.

Solution

  • the main problem is the closure variables.

    if you don't have those it can be done like this:

    import marshal
    import multiprocessing
    import types
    from functools import partial
    
    
    def main():
        def internal_func(c):
            return c*c
    
        with multiprocessing.Pool(5) as pool:
            print(internal_func_map(pool, internal_func, [i for i in range(10)]))
    
    
    def internal_func_map(pool, f, gen):
        marshaled = marshal.dumps(f.__code__)
        return pool.map(partial(run_func, marshaled=marshaled), gen)
    
    
    def run_func(*args, **kwargs):
        marshaled = kwargs.pop("marshaled")
        func = marshal.loads(marshaled)
    
        restored_f = types.FunctionType(func, globals())
        return restored_f(*args, **kwargs)
    
    
    if __name__ == "__main__":
        main()
    

    the idea is that the function code has everything you need in order to run it in a new process. notice that no external dependencies are needed, just regular python libraries.

    If closures are indeed needed, then the most difficult part about this solution is actually creating them. (in closure there is something called a "cell" which is not very easy to create by code...)

    Here is the somewhat elaborate working code:

    import marshal
    import multiprocessing
    import pickle
    import types
    from functools import partial
    
    
    class A:
        def __init__(self, a):
            self.a = a
    
    
    def main():
        x = A(1)
    
        def internal_func(c):
            return x.a + c
    
        with multiprocessing.Pool(5) as pool:
            print(internal_func_map(pool, internal_func, [i for i in range(10)]))
    
    
    def internal_func_map(pool, f, gen):
        closure = f.__closure__
        marshaled_func = marshal.dumps(f.__code__)
        pickled_closure = pickle.dumps(tuple(x.cell_contents for x in closure))
        return pool.map(partial(run_func, marshaled_func=marshaled_func, pickled_closure=pickled_closure), gen)
    
    
    def run_func(*args, **kwargs):
        marshaled_func = kwargs.pop("marshaled_func")
        func = marshal.loads(marshaled_func)
        pickled_closure = kwargs.pop("pickled_closure")
        closure = pickle.loads(pickled_closure)
    
        restored_f = types.FunctionType(func, globals(), closure=create_closure(func, closure))
        return restored_f(*args, **kwargs)
    
    
    def create_closure(func, original_closure):
        indent = " " * 4
        closure_vars_def = f"\n{indent}".join(f"{name}=None" for name in func.co_freevars)
        closure_vars_ref = ",".join(func.co_freevars)
        dynamic_closure = "create_dynamic_closure"
        s = (f"""
    def {dynamic_closure}():
        {closure_vars_def}
        def internal():
            {closure_vars_ref}
        return internal.__closure__
    """)
        exec(s)
        created_closure = locals()[dynamic_closure]()
        for closure_var, value in zip(created_closure, original_closure):
            closure_var.cell_contents = value
        return created_closure
    
    
    if __name__ == "__main__":
        main()
    
    

    Hope that helps or at least gives you some ideas on how to tackle this problem!