Search code examples
pythonpython-3.xmultiprocessingconcurrent.futuresdynamic-class-creation

Unable to use dynamic classes with concurrent.futures.ProcessPoolExecutor


In the code below, I am dynamically creating an object of the class inside the _py attribute by using the generate_object method.

The code works perfectly if I am not using a concurrent approach. However, if I use concurrency from concurrent.futures, I do not get the desired result because of an error saying (beyond other things):

_pickle.PicklingError: Can't pickle <class '__main__.Script_0_1'>: attribute lookup Script_0_1 on __main__ failed

After googling this error, I understood that only picklable objects are to be passed as parameter in ProcessPoolExecutor.map(), so I decided to see how I could turn my dynamic class to be picklable.

The problem is that all other solutions for this problem creates a dynamic object in a different manner (different from what I'm using in _string_to_object()). Examples: 1 and 2

I would very much like to keep the dynamic object creation the way it is right now because a lot of my real code is based on it, therefore I am looking for a concurrent solution that works with this toy code below.

Code

import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

class A:
    def __init__(self):
        self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
    
    def generate_text(self, name_1, name_2):
        py = self._py.format(name_1, name_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1, number_2))

    def _string_to_object(self, str_class, *args, **kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
        return locals()[class_name](*args, **kwargs)

from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()

print('Multiprocessing usage')
n_cores = 3
n_calls = 3

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

with ProcessPoolExecutor(max_workers=n_cores) as executor:
    args = ( (A().generate_object(i, i+1), A().generate_object(i+1, i+2)) for i in range(n_calls))
    results = executor.map(concurrent_function, args)

Solution

  • I couldn't come up with a way of getting the Script classes to be created in the global name space strictly adhering to your current scheme. However:

    Since for each invocation of method generate_object you are creating a new class in the local namespace and instantiating an object of that class, why not postpone that work for it to be done in the process pool? This also has the added advantage of doing this class-creation processing in parallel and there is no pickling required. We now pass to concurrent_function the two integer arguments number_1 and number_2:

    import random
    import codecs
    import re
    from concurrent.futures import ProcessPoolExecutor
    
    
    class A:
        def __init__(self):
            self._py = r'''
    class Script_{0}_{1}:
    \tdef print_numbers(self):
    \t\tprint('Numbers = ', {0}, 'and', {1})
    '''
    
        def generate_text(self, name_1, name_2):
            py = self._py.format(name_1, name_2)
            py = codecs.decode(py, 'unicode_escape')
            return py
    
        def generate_object(self, number_1, number_2):
            """ Generate an object of the class inside the string self._py """
    
            return self._string_to_object(self.generate_text(number_1, number_2))
    
        def _string_to_object(self, str_class, *args, **kwargs):
            """ Transform a program written inside str_class to an object. """
    
            exec(str_class)
            class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
            return locals()[class_name](*args, **kwargs)
    
    """
    from functools import partial
    
    print('Single usage')
    a = A()
    script = a.generate_object(1, 2)
    script.print_numbers()
    """
    
    
    def concurrent_function(args):
        for arg in args:
            obj = A().generate_object(arg[0], arg[1])
            obj.print_numbers()
    
    def main():
        print('Multiprocessing usage')
        n_cores = 3
        n_calls = 3
    
        with ProcessPoolExecutor(max_workers=n_cores) as executor:
            args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
            # wait for completion of all tasks:
            results = list(executor.map(concurrent_function, args))
    
    if __name__ == '__main__':
        main()
    

    Prints:

    Multiprocessing usage
    Numbers =  0 and 1
    Numbers =  1 and 2
    Numbers =  1 and 2
    Numbers =  2 and 3
    Numbers =  2 and 3
    Numbers =  3 and 4
    

    A More Efficient Way

    There is no need to use exec. Instead use closures:

    from concurrent.futures import ProcessPoolExecutor
    
    def make_print_function(number_1, number_2):
        def print_numbers():
            print(f'Numbers = {number_1} and {number_2}')
    
        return print_numbers
    
    
    
    def concurrent_function(args):
        for arg in args:
            fn = make_print_function(arg[0], arg[1])
            fn()
    
    
    def main():
        print('Multiprocessing usage')
        n_cores = 3
        n_calls = 3
    
        with ProcessPoolExecutor(max_workers=n_cores) as executor:
            args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
            # wait for completion of all tasks:
            results = list(executor.map(concurrent_function, args))
    
    if __name__ == '__main__':
        main()
    

    Prints:

    Multiprocessing usage
    Numbers = 0 and 1
    Numbers = 1 and 2
    Numbers = 1 and 2
    Numbers = 2 and 3
    Numbers = 2 and 3
    Numbers = 3 and 4
    

    Using an Object Cache to Avoid Creating New Objects Unnecessarily

    obj_cache = {} # each process will have its own
    
    def concurrent_function(args):
        for arg in args:
            # was an object created with this set of arguments: (arg[0], arg[1])?
            obj = obj_cache.get(arg)
            if obj is None: # must create new object
                obj = A().generate_object(arg[0], arg[1])
                obj_cache[arg] = obj # save object for possible future use
            obj.print_numbers()