In the code below, I am dynamically creating an object of the class inside the _py
attribute by using the generate_object
method.
The code works perfectly if I am not using a concurrent approach. However, if I use concurrency from concurrent.futures
, I do not get the desired result because of an error saying (beyond other things):
_pickle.PicklingError: Can't pickle <class '__main__.Script_0_1'>: attribute lookup Script_0_1 on __main__ failed
After googling this error, I understood that only picklable objects are to be passed as parameter in ProcessPoolExecutor.map()
, so I decided to see how I could turn my dynamic class to be picklable.
The problem is that all other solutions for this problem creates a dynamic object in a different manner (different from what I'm using in _string_to_object()
). Examples: 1 and 2
I would very much like to keep the dynamic object creation the way it is right now because a lot of my real code is based on it, therefore I am looking for a concurrent solution that works with this toy code below.
Code
import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
class A:
def __init__(self):
self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
def generate_text(self, name_1, name_2):
py = self._py.format(name_1, name_2)
py = codecs.decode(py, 'unicode_escape')
return py
def generate_object(self, number_1, number_2):
""" Generate an object of the class inside the string self._py """
return self._string_to_object(self.generate_text(number_1, number_2))
def _string_to_object(self, str_class, *args, **kwargs):
""" Transform a program written inside str_class to an object. """
exec(str_class)
class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
return locals()[class_name](*args, **kwargs)
from functools import partial
print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
def concurrent_function(args):
first_A = args[0]
second_A = args[1]
first_A.print_numbers()
second_A.print_numbers()
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( (A().generate_object(i, i+1), A().generate_object(i+1, i+2)) for i in range(n_calls))
results = executor.map(concurrent_function, args)
I couldn't come up with a way of getting the Script
classes to be created in the global name space strictly adhering to your current scheme. However:
Since for each invocation of method generate_object
you are creating a new class in the local namespace and instantiating an object of that class, why not postpone that work for it to be done in the process pool? This also has the added advantage of doing this class-creation processing in parallel and there is no pickling required. We now pass to concurrent_function
the two integer arguments number_1
and number_2
:
import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
class A:
def __init__(self):
self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
def generate_text(self, name_1, name_2):
py = self._py.format(name_1, name_2)
py = codecs.decode(py, 'unicode_escape')
return py
def generate_object(self, number_1, number_2):
""" Generate an object of the class inside the string self._py """
return self._string_to_object(self.generate_text(number_1, number_2))
def _string_to_object(self, str_class, *args, **kwargs):
""" Transform a program written inside str_class to an object. """
exec(str_class)
class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
return locals()[class_name](*args, **kwargs)
"""
from functools import partial
print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
"""
def concurrent_function(args):
for arg in args:
obj = A().generate_object(arg[0], arg[1])
obj.print_numbers()
def main():
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
# wait for completion of all tasks:
results = list(executor.map(concurrent_function, args))
if __name__ == '__main__':
main()
Prints:
Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4
A More Efficient Way
There is no need to use exec
. Instead use closures:
from concurrent.futures import ProcessPoolExecutor
def make_print_function(number_1, number_2):
def print_numbers():
print(f'Numbers = {number_1} and {number_2}')
return print_numbers
def concurrent_function(args):
for arg in args:
fn = make_print_function(arg[0], arg[1])
fn()
def main():
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
# wait for completion of all tasks:
results = list(executor.map(concurrent_function, args))
if __name__ == '__main__':
main()
Prints:
Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4
Using an Object Cache to Avoid Creating New Objects Unnecessarily
obj_cache = {} # each process will have its own
def concurrent_function(args):
for arg in args:
# was an object created with this set of arguments: (arg[0], arg[1])?
obj = obj_cache.get(arg)
if obj is None: # must create new object
obj = A().generate_object(arg[0], arg[1])
obj_cache[arg] = obj # save object for possible future use
obj.print_numbers()