Search code examples
picklepython-multiprocessing

Multiprocessing: pool.imap_unordered; can't pickle function


I'm working through a function in a book by Lopez de Prado. Here is a minimal example of what I'm trying to do:

import numpy as np
import pandas as pd
import multiprocessing as mp

# Define test function
def test_func(idx, a, b):
    return [idx, a**1 + b**2]

test_func = np.vectorize(test_func)

test_df = pd.DataFrame(np.random.normal(size = (10, 2)), columns = ['a', 'b'])

test_df = test_df.reset_index()

func_dict = {col:test_df[col] for col in test_df.columns}

# Function used to evaluate test_func
def expandCall(kargs):

    # Get the function argument
    func = kargs['func']

    # Delete it from the fictionary
    del kargs['func']

    # Evaluate function with other arguments
    out = func(**kargs)

    return out

parts = np.linspace(0, test_df.shape[0], 2).astype(int)

jobs = []

for i, j in zip(parts[:-1], parts[1:]):

    job = {key:func_dict[key][i:j] for key in func_dict}
    job.update({'func':test_func})
    jobs.append(job)
    
pool = mp.Pool(processes = 2)

outputs = pool.imap_unordered(expandCall, jobs)

out = []

# Process asynchronous output, report progress
for out_ in outputs:

    out.append(out_)

pool.close()
pool.join()

I get the error

PicklingError: Can't pickle <function test_func at 0x0000027FBF960F40>: it's not the same object as --main--.test_func

If I'm reading the book properly, Lopez de Prado provides a solution:

def _unpickle_method(func_name, obj, cls):
    
    for cls in cls.mro():
        
        try:
            
            func = cls.__dict__[func_name]
            
        except KeyError:
            
            pass
        
        else:
            
            break
        
    return func.__get__(obj, cls)


def _pickle_method(method):
    
    func_name = method.im_func.__name__
    
    obj = method.im_self
    
    cls = method.im_class
    
    return _unpickle_method, (func_name, obj, cls)

import copyreg, types

copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)

However, it doesn't work for me. Perhaps due to changes in the multiprocessing module since the book was published. Unfortunately, I don't understand the solution well enough to update it. Any help getting my minimal example to run would be appreciated.


Solution

  • the method you posted allows you to pickle class methods, lambdas are not a class method, at least not in python3, and the standard python pickler doesn't allow pickling lambdas.

    you can use cloudpickle which allows you to pickle lambdas, you have to do the obj = cloudpickle.dumps(real_object) and then pass that obj to the pool then real_object = cloudpickle.loads(obj) on the other side.

    np.vectorize is written in C and needs pickling support added in C, but you can get around it by wrapping it in a simple "wrapper" to facilitate pickling, see __getstate__ and __setstate__ docs

    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    
    class vectorize_wrapper:
        def __init__(self, pyfunc):
            self.func = np.vectorize(pyfunc)
    
        def __call__(self, *args, **kwargs):
            return self.func(*args, **kwargs)
    
        def __setstate__(self, state):
            self.func = np.vectorize(state)
    
        def __getstate__(self):
            return self.func.pyfunc
    
    def test_func(idx, a, b):
        return [idx, a**1 + b**2]
    
    
    test_func_vectorized = vectorize_wrapper(test_func)
    
    import pickle
    a = pickle.dumps(test_func_vectorized)
    b = pickle.loads(a)
    

    the main restriction is that the vectorized function should be importable, you cannot have another function or object with the same name in the same file, and it needs to be in the global scope, in other words, python can reach it by typing

    from some_module import my_python_function
    

    because this is what the standard pickler does. i think cloudpickle doesn't have this strict requirement.