Search code examples
pythonclassmultiprocessingpickle

Can't pickle static method when using multiprocessing


I'm applying some parallelization to my code, in which I use classes. I knew that is not possible to pickle a class method without any other approach different of what Python provides. I found a solution here.

In my code, I have two parts that should be parallelized, both using class. Here, I'm posting a very simple code just representing the structure of mine (is the same, but I deleted the methods content, which was a lot of math calculus, insignificant for the output that I'm getting).

The problem is while I can pickle one method (shepard_interpolation), with the other one (calculate_orientation_uncertainty) I got the pickle error. I don't know why this is happing, or why it works partly.

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    if func_name.startswith('__') and not func_name.endswith('__'): #deal with mangled names
        cls_name = cls.__name__.lstrip('_')
        func_name = '_' + cls_name + func_name
    print cls
    return _unpickle_method, (func_name, obj, cls)


def _unpickle_method(func_name, obj, cls):
    for cls in cls.__mro__:
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

class ImageData(object):

    def __init__(self, width=60, height=60):
        self.width = width
        self.height = height
        self.data = []
        for i in range(width):
            self.data.append([0] * height)
            
    def shepard_interpolation(self, seeds=20):
        print "ImD - Sucess"       

import copy_reg
import types
from itertools import product
from multiprocessing import Pool

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class VariabilityOfGradients(object):
    def __init__(self):
        pass
        
    @staticmethod
    def aux():
        return "VoG - Sucess" 
            
    @staticmethod
    def calculate_orientation_uncertainty():
        results = []
        pool = Pool()
        for x, y in product(range(1, 5), range(1, 5)):
            result = pool.apply_async(VariabilityOfGradients.aux) 
        results.append(result.get())
        pool.close()
        pool.join()        

        
if __name__ == '__main__':  
    results = []
    pool = Pool()
    for _ in range(3):
        result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
        results.append(result.get())
    pool.close()
    pool.join()
    
    VariabilityOfGradients.calculate_orientation_uncertainty()   

 

When running, I got

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

And this is almost the same found here. The only difference that I see is that my methods are static.

I noticed that in my calculate_orientation_uncertainty, when I call the function as result = pool.apply_async(VariabilityOfGradients.aux()), i.e., with the parenthesis (in the doc examples I never saw this), it seems to work. But, when I try to get the result, I receive

TypeError: 'int' object is not callable

How can I do this correctly?


Solution

  • You could define a plain function at the module level and a staticmethod as well. This preserves the calling syntax, introspection and inheritability features of a staticmethod, while avoiding the pickling problem:

    def aux():
        return "VoG - Sucess" 
    
    class VariabilityOfGradients(object):
        aux = staticmethod(aux)
    

    For example,

    import copy_reg
    import types
    from itertools import product
    import multiprocessing as mp
    
    def _pickle_method(method):
        """
        Author: Steven Bethard (author of argparse)
        http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
        """
        func_name = method.im_func.__name__
        obj = method.im_self
        cls = method.im_class
        cls_name = ''
        if func_name.startswith('__') and not func_name.endswith('__'):
            cls_name = cls.__name__.lstrip('_')
        if cls_name:
            func_name = '_' + cls_name + func_name
        return _unpickle_method, (func_name, obj, cls)
    
    
    def _unpickle_method(func_name, obj, cls):
        """
        Author: Steven Bethard
        http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
        """
        for cls in cls.mro():
            try:
                func = cls.__dict__[func_name]
            except KeyError:
                pass
            else:
                break
        return func.__get__(obj, cls)
    
    copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
    
    class ImageData(object):
    
        def __init__(self, width=60, height=60):
            self.width = width
            self.height = height
            self.data = []
            for i in range(width):
                self.data.append([0] * height)
    
        def shepard_interpolation(self, seeds=20):
            print "ImD - Success"       
    
    def aux():
        return "VoG - Sucess" 
    
    class VariabilityOfGradients(object):
        aux = staticmethod(aux)
    
        @staticmethod
        def calculate_orientation_uncertainty():
            pool = mp.Pool()
            results = []
            for x, y in product(range(1, 5), range(1, 5)):
                # result = pool.apply_async(aux) # this works too
                result = pool.apply_async(VariabilityOfGradients.aux, callback=results.append)
            pool.close()
            pool.join()
            print(results)
    
    
    if __name__ == '__main__':  
        results = []
        pool = mp.Pool()
        for _ in range(3):
            result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
            results.append(result.get())
        pool.close()
        pool.join()
    
        VariabilityOfGradients.calculate_orientation_uncertainty()   
    

    yields

    ImD - Success
    ImD - Success
    ImD - Success
    ['VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess']
    

    By the way, result.get() blocks the calling process until the function called by pool.apply_async (e.g. ImageData.shepard_interpolation) is completed. So

    for _ in range(3):
        result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
        results.append(result.get())
    

    is really calling ImageData.shepard_interpolation sequentially, defeating the purpose of the pool.

    Instead you could use

    for _ in range(3):
        pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()],
                         callback=results.append)
    

    The callback function (e.g. results.append) is called in a thread of the calling process when the function is completed. It is sent one argument -- the return value of the function. Thus nothing blocks the three pool.apply_async calls from being made quickly, and the work done by the three calls to ImageData.shepard_interpolation will be performed concurrently.

    Alternatively, it might be simpler to just use pool.map here.

    results = pool.map(ImageData.shepard_interpolation, [ImageData()]*3)