I'm working through a function in a book by Lopez de Prado. Here is a minimal example of what I'm trying to do:
import numpy as np
import pandas as pd
import multiprocessing as mp
# Define test function
def test_func(idx, a, b):
return [idx, a**1 + b**2]
test_func = np.vectorize(test_func)
test_df = pd.DataFrame(np.random.normal(size = (10, 2)), columns = ['a', 'b'])
test_df = test_df.reset_index()
func_dict = {col:test_df[col] for col in test_df.columns}
# Function used to evaluate test_func
def expandCall(kargs):
# Get the function argument
func = kargs['func']
# Delete it from the fictionary
del kargs['func']
# Evaluate function with other arguments
out = func(**kargs)
return out
parts = np.linspace(0, test_df.shape[0], 2).astype(int)
jobs = []
for i, j in zip(parts[:-1], parts[1:]):
job = {key:func_dict[key][i:j] for key in func_dict}
job.update({'func':test_func})
jobs.append(job)
pool = mp.Pool(processes = 2)
outputs = pool.imap_unordered(expandCall, jobs)
out = []
# Process asynchronous output, report progress
for out_ in outputs:
out.append(out_)
pool.close()
pool.join()
I get the error
PicklingError: Can't pickle <function test_func at 0x0000027FBF960F40>: it's not the same object as --main--.test_func
If I'm reading the book properly, Lopez de Prado provides a solution:
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
import copyreg, types
copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)
However, it doesn't work for me. Perhaps due to changes in the multiprocessing module since the book was published. Unfortunately, I don't understand the solution well enough to update it. Any help getting my minimal example to run would be appreciated.
the method you posted allows you to pickle class methods, lambdas are not a class method, at least not in python3, and the standard python pickler doesn't allow pickling lambdas.
you can use cloudpickle which allows you to pickle lambdas, you have to do the obj = cloudpickle.dumps(real_object)
and then pass that obj
to the pool then real_object = cloudpickle.loads(obj)
on the other side.
np.vectorize is written in C and needs pickling support added in C, but you can get around it by wrapping it in a simple "wrapper" to facilitate pickling, see __getstate__
and __setstate__
docs
import numpy as np
import pandas as pd
import multiprocessing as mp
class vectorize_wrapper:
def __init__(self, pyfunc):
self.func = np.vectorize(pyfunc)
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
def __setstate__(self, state):
self.func = np.vectorize(state)
def __getstate__(self):
return self.func.pyfunc
def test_func(idx, a, b):
return [idx, a**1 + b**2]
test_func_vectorized = vectorize_wrapper(test_func)
import pickle
a = pickle.dumps(test_func_vectorized)
b = pickle.loads(a)
the main restriction is that the vectorized function should be importable, you cannot have another function or object with the same name in the same file, and it needs to be in the global scope, in other words, python can reach it by typing
from some_module import my_python_function
because this is what the standard pickler does. i think cloudpickle doesn't have this strict requirement.