I have different python classes and all of them having a method called push. What I am trying to is
I have tried multiple ways using
#from pathos.multiprocessing import ProcessingPool as Pool
from multiprocessing import Pool
def push_wrapper(obj):
obj.push()
class_objects=[class1_obj1,class2_obj2,class3_obj3]
with Pool(processes=max_thread_count) as pool:
pool.map(push_wrapper, class_objects)
pool.close()
pool.join()
With this I am getting this error
raise TypeError(f"cannot pickle {self.class.name!r} object")
There are some other approaches like using pool.apply_async
but they are not waiting for all the methods to complete exiting immediately. When I add job.wait()
along with pool.apply_async
its waiting for all threads to complete but I want to print result of the thread as and when its complete.
Your question is somewhat vague. If push_wrapper
is expecting to be passed a class instance that has been instantiated by the main process, which seems to be the case looking at the code, but you want the method calls to push
be done in parallel, then:
class A:
def __init__(self):
print('A.__init__ called')
def push(self):
print('A.push called')
class B:
def __init__(self):
print('B.__init__ called')
def push(self):
print('B.push called')
def push_wrapper(class_instance):
class_instance.push()
def main():
from multiprocessing import Pool
class_instances = [A(), B()]
with Pool() as pool:
pool.map(push_wrapper, class_instances)
if __name__ == '__main__':
main()
Prints:
A.__init__ called
B.__init__ called
A.push called
B.push called
But this does not fit the description you provided in English, namely:
So, if you would really like to do both the class instantiation followed by a call to push
in parallel (i.e. "all these methods"), then you can pass to your worker function the name of the class as follows:
class A:
def __init__(self):
print('A.__init__ called')
def push(self):
print('A.push called')
class B:
def __init__(self):
print('B.__init__ called')
def push(self):
print('B.push called')
def create_class_instance(class_name, *args, **kwargs):
the_class = globals()[class_name]
return the_class(*args, **kwargs)
def push_wrapper(class_name):
class_instance = create_class_instance(class_name)
class_instance.push()
def main():
from multiprocessing import Pool
class_names = ['A', 'B']
with Pool() as pool:
pool.map(push_wrapper, class_names)
if __name__ == '__main__':
main()
Prints:
A.__init__ called
A.push called
B.__init__ called
B.push called
Another Way by Passing Class Types
class A:
def __init__(self):
print('A.__init__ called')
def push(self):
print('A.push called')
class B:
def __init__(self):
print('B.__init__ called')
def push(self):
print('B.push called')
def create_class_instance(class_type, *args, **kwargs):
return class_type(*args, **kwargs)
# Or return class_type.__call__(*args, **kwargs)
def push_wrapper(class_type):
class_instance = create_class_instance(class_type)
class_instance.push()
def main():
from multiprocessing import Pool
class_types = [A, B]
with Pool() as pool:
pool.map(push_wrapper, class_types)
if __name__ == '__main__':
main()
Prints:
A.__init__ called
A.push called
B.__init__ called
B.push called
And If You Are Possibly Specifying a Class Name In Another Package/Module
We assume that each class implements an append
method:
class A:
def __init__(self, l=None):
self.l = l or []
def append(self, value):
self.l.append(value)
def __repr__(self):
return f'A({self.l})'
def create_class_instance(package_class_name, *args, **kwargs):
parts = package_class_name.split('.')
l = len(parts)
if l != 1:
package_name = '.'.join(parts[0:l-1])
class_name = parts[-1]
module = __import__(package_name, globals(), locals(), [class_name])
return module.__dict__[class_name](*args, *kwargs)
return globals()[package_class_name](*args, **kwargs)
def append_wrapper(package_class_name, *args, **kwargs):
class_instance = create_class_instance(package_class_name, *args, **kwargs)
class_instance.append(4)
print(class_instance)
def main():
from multiprocessing import Pool
class_names_and_args = [
('A', [1, 2, 3]),
('collections.deque', [1, 2, 3]),
('array.array', 'i', [1, 2, 3])
]
with Pool() as pool:
pool.starmap(append_wrapper, class_names_and_args)
if __name__ == '__main__':
main()
Prints:
A([1, 2, 3, 4])
deque([1, 2, 3, 4])
array('i', [1, 2, 3, 4])