Search code examples
pythonmultiprocessing

Multiprocessing with multiple class objects


I have different python classes and all of them having a method called push. What I am trying to is

  1. Create multiple class objects one from each class
  2. Call push method
  3. All these methods should execute in parallel
  4. Log messages coming from push method as and when its complete
  5. Wait for all the methods to complete and exit scrip

I have tried multiple ways using

    #from pathos.multiprocessing import ProcessingPool as Pool
    from multiprocessing import Pool
    def push_wrapper(obj):
        obj.push()
    
    
    class_objects=[class1_obj1,class2_obj2,class3_obj3]
    with Pool(processes=max_thread_count) as pool:
        pool.map(push_wrapper, class_objects)
        pool.close()
        pool.join()

With this I am getting this error

raise TypeError(f"cannot pickle {self.class.name!r} object")

There are some other approaches like using pool.apply_async but they are not waiting for all the methods to complete exiting immediately. When I add job.wait() along with pool.apply_async its waiting for all threads to complete but I want to print result of the thread as and when its complete.


Solution

  • Your question is somewhat vague. If push_wrapper is expecting to be passed a class instance that has been instantiated by the main process, which seems to be the case looking at the code, but you want the method calls to push be done in parallel, then:

    class A:
        def __init__(self):
            print('A.__init__ called')
    
        def push(self):
            print('A.push called')
    
    class B:
        def __init__(self):
            print('B.__init__ called')
    
        def push(self):
            print('B.push called')
    
    def push_wrapper(class_instance):
        class_instance.push()
    
    def main():
        from multiprocessing import Pool
    
        class_instances = [A(), B()]
    
        with Pool() as pool:
            pool.map(push_wrapper, class_instances)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    A.__init__ called
    B.__init__ called
    A.push called
    B.push called
    

    But this does not fit the description you provided in English, namely:

    1. All these methods should execute in parallel

    So, if you would really like to do both the class instantiation followed by a call to push in parallel (i.e. "all these methods"), then you can pass to your worker function the name of the class as follows:

    class A:
        def __init__(self):
            print('A.__init__ called')
    
        def push(self):
            print('A.push called')
    
    class B:
        def __init__(self):
            print('B.__init__ called')
    
        def push(self):
            print('B.push called')
    
    
    def create_class_instance(class_name, *args, **kwargs):
        the_class = globals()[class_name]
        return the_class(*args, **kwargs)
    
    def push_wrapper(class_name):
        class_instance = create_class_instance(class_name)
        class_instance.push()
    
    def main():
        from multiprocessing import Pool
    
        class_names = ['A', 'B']
    
        with Pool() as pool:
            pool.map(push_wrapper, class_names)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    A.__init__ called
    A.push called
    B.__init__ called
    B.push called
    

    Another Way by Passing Class Types

    class A:
        def __init__(self):
            print('A.__init__ called')
    
        def push(self):
            print('A.push called')
    
    class B:
        def __init__(self):
            print('B.__init__ called')
    
        def push(self):
            print('B.push called')
    
    
    def create_class_instance(class_type, *args, **kwargs):
        return class_type(*args, **kwargs)
        # Or return class_type.__call__(*args, **kwargs)
    
    def push_wrapper(class_type):
        class_instance = create_class_instance(class_type)
        class_instance.push()
    
    def main():
        from multiprocessing import Pool
    
        class_types = [A, B]
    
        with Pool() as pool:
            pool.map(push_wrapper, class_types)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    A.__init__ called
    A.push called
    B.__init__ called
    B.push called
    

    And If You Are Possibly Specifying a Class Name In Another Package/Module

    We assume that each class implements an append method:

    class A:
        def __init__(self, l=None):
            self.l = l or []
    
        def append(self, value):
            self.l.append(value)
    
    
        def __repr__(self):
            return f'A({self.l})'
    
    def create_class_instance(package_class_name, *args, **kwargs):
        parts = package_class_name.split('.')
        l = len(parts)
        if l != 1:
            package_name = '.'.join(parts[0:l-1])
            class_name = parts[-1]
            module = __import__(package_name, globals(), locals(), [class_name])
            return module.__dict__[class_name](*args, *kwargs)
        return globals()[package_class_name](*args, **kwargs)
    
    def append_wrapper(package_class_name, *args, **kwargs):
        class_instance = create_class_instance(package_class_name, *args, **kwargs)
        class_instance.append(4)
        print(class_instance)
    
    def main():
        from multiprocessing import Pool
    
        class_names_and_args = [
            ('A', [1, 2, 3]),
            ('collections.deque', [1, 2, 3]),
            ('array.array', 'i', [1, 2, 3])
        ]
    
        with Pool() as pool:
            pool.starmap(append_wrapper, class_names_and_args)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    A([1, 2, 3, 4])
    deque([1, 2, 3, 4])
    array('i', [1, 2, 3, 4])