I am trying to create attributes of an instance in parallel to learn more about multiprocessing. My objective is to avoid creating the attributes in a sequential way, assuming that they are independent of each other. I read that multiprocessing creates its own space and that is possible to establish a connection between the process.
I think that this connection can help me to share the same object among the workers, but I did not find any post that could show a way to implement this. If I try to create the attributes in parallel I'm not able to access them on the main when the process concludes. Can someone help me with that? What do I need to do?
Below I provide a MRE about what I'm trying to get by using the MPIRE package. Hope that it can illustrate my question.
from mpire import WorkerPool
import os
class B:
def __init__(self):
pass
class A:
def __init__(self):
self.model = B()
def do_something(self, var):
if var == 'var1':
self.model.var1 = var
elif var == 'var2':
self.model.var2 = var
else:
print('other var.')
def do_something2(self, model, var):
if var == 'var1':
model.var1 = var
print(f"Worker {os.getpid()} is processing do_something2({var})")
elif var == 'var2':
model.var2 = var
print(f"Worker {os.getpid()} is processing do_something2({var})")
else:
print(f"Worker {os.getpid()} is processing do_something2({var})")
def init_var(self):
self.do_something('var1')
self.do_something('test')
print(self.model.var1)
print(vars(self.model).keys())
# Trying to create the attributes in parallel
print('')
self.model = B()
self.__sets_list = ['var1', 'var2', 'var3']
with WorkerPool(n_jobs=3, start_method='fork') as pool:
model = self.model
pool.set_shared_objects(model)
pool.map(self.do_something2,self.__sets_list)
print(self.model.var1)
print(vars(self.model).keys())
def main(): # this main will be in another file that call different classes
obj = A()
obj.init_var()
if __name__ == '__main__':
main = main()
It generates the following output:
python src/test_change_object.py other var. var1 dict_keys(['var1']) Worker 20040 is processing do_something2(var1) Worker 20041 is processing do_something2(var2) Worker 20042 is processing do_something2(var3) Traceback (most recent call last): File "/mnt/c/git/bioactives/src/test_change_object.py", line 59, in main = main() File "/mnt/c/git/bioactives/src/test_change_object.py", line 55, in main obj.init_var() File "/mnt/c/git/bioactives/src/test_change_object.py", line 49, in init_var print(self.model.var1) AttributeError: 'B' object has no attribute 'var1'
I appreciate any help. Tkx
Would a solution without using mpire work? You could achieve what you are after, i.e. sharing state of complex objects, by using multiprocessing primitives.
TL;DR
This code works:
import os
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
import types
class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
class B:
def __init__(self):
pass
@classmethod
def create(cls, *args, **kwargs):
# Register class
class_str = cls.__name__
BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
# Start a manager process
manager = BaseManager()
manager.start()
# Create and return this proxy instance. Using this proxy allows sharing of state between processes.
inst = eval("manager.{}(*args, **kwargs)".format(class_str))
return inst
class A:
def __init__(self):
self.model = B.create()
def do_something(self, var):
if var == 'var1':
self.model.var1 = var
elif var == 'var2':
self.model.var2 = var
else:
print('other var.')
def do_something2(self, model, var):
if var == 'var1':
model.var1 = var
print(f"Worker {os.getpid()} is processing do_something2({var})")
elif var == 'var2':
model.var2 = var
print(f"Worker {os.getpid()} is processing do_something2({var})")
else:
print(f"Worker {os.getpid()} is processing do_something2({var})")
def init_var(self):
self.do_something('var1')
self.do_something('test')
print(self.model.var1)
print(vars(self.model).keys())
# Trying to create the attributes in parallel
print('')
self.model = B.create()
self.__sets_list = [(self.model, 'var1'), (self.model, 'var2'), (self.model, 'var3')]
with Pool(3) as pool:
# model = self.model
# pool.set_shared_objects(model)
pool.starmap(self.do_something2, self.__sets_list)
print(self.model.var1)
print(vars(self.model).keys())
def main(): # this main will be in another file that call different classes
obj = A()
obj.init_var()
if __name__ == '__main__':
main = main()
Longer, detailed explanation
Here is what I think is happening. Even though you are setting self.model
as a shared object among your workers, the fact that you alter it within the workers force a copy being made (i.e, the shared objects are not writable). From the documentation for shared objects in mpire:
For the start method fork these shared objects are treated as copy-on-write, which means they are only copied once changes are made to them. Otherwise they share the same memory address
Therefore, it suggests that shared objects with method fork
is only useful for cases where you would only be reading from the objects. The documentation also provides such a use case
This is convenient if you want to let workers access a large dataset that wouldn’t fit in memory when copied multiple times.
Take this with a grain of salt though, since again, I have not used mpire. Hopefully someone with more experience with the library can provide further clarifications.
Anyway, moving on, you can achieve this using multiprocessing managers. Managers allow you to share complex objects (an object of class B in this context) between processes and workers. You can use them to also share nested dictionaries, lists, etc. They do this by spawning a server process, where the shared object is actually stored, and allow other processes to access the object through proxies (more on this later), and by pickling/unpickling any arguments and return values passed to and from the server process. As a sidenote, using pickling/unpickling also leads to restrictive structures. For example, in our context, it would mean that any function arguments and instance variables you make for class B should be picklable.
Coming back, I mentioned that we can access the server process through proxies. Proxies are basically just wrappers which mimic the properties and functions of the original object. Most utilize specific dunder methods like __setattr__
and __getattr__
, an example given below (from here):
class Proxy(object):
def __init__(self, original):
self.original = original
def __getattr__(self, attr):
return getattr(self.original, attr)
class MyObj(object):
def bar(self):
print 'bar'
obj = MyObj()
proxy = Proxy(obj)
proxy.bar() # 'bar'
obj.bar() # 'bar'
A huge plus of using proxies is that they are picklable, which is important when dealing with shared objects. Under the hood, manager creates a proxy for you whenever you create a shared object through it. However, this default proxy (called AutoProxy
) does not share the namespace of the object. This will not work for us since we are using the class B's namespace and want that to be shared as well. Therefore, we create our own proxy by inheriting another, undocumented proxy provided by multiprocessing: NamespaceProxy
. As the name suggests, this one does share the namespace, but conversely, does not share any instance methods. This is why we need to create our own proxy which is the best of both worlds:
from multiprocessing.managers import NamespaceProxy
import types
class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
More info on why this works. Keep in mind that these proxies do not share private or protected attributes/functions (check this question).
After we have achieved this, the rest is just some boilerplate-ish code which uses this proxy by default to create shareable complex objects for particular datatypes. In our context this means that code for class B will become this:
from multiprocessing import Manager, Queue, Pool
from multiprocessing.managers import BaseManager
class B:
def __init__(self):
pass
@classmethod
def create(cls, *args, **kwargs):
# Register class
class_str = cls.__name__
BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
# Start a manager process
manager = BaseManager()
manager.start()
# Create and return this proxy instance. Using this proxy allows sharing of state between processes.
inst = eval("manager.{}(*args, **kwargs)".format(class_str))
return inst
In the above code, the create
function is a general class constructor which automatically uses our new proxy and managers to share the object. It can be used for any class, not only B, to do so. The only thing now left is to change usage of mpire pool to multiprocessing pool in init_var
. Note how we use B.create()
instead of simply using B()
to create objects of class B!:
def init_var(self):
self.do_something('var1')
self.do_something('test')
print(self.model.var1)
print(vars(self.model).keys())
# Trying to create the attributes in parallel
print('')
self.model = B.create()
self.__sets_list = [(self.model, 'var1'), (self.model, 'var2'), (self.model, 'var3')]
with Pool(3) as pool:
# model = self.model
# pool.set_shared_objects(model)
pool.starmap(self.do_something2, self.__sets_list)
print(self.model.var1)
print(vars(self.model).keys())
Note : I have only tested this on Windows multiprocessing which does not use "fork" method but rather "spawn" method to start a process. More information here