Documentation mentions that you're able to setup a Manager with nested proxy variables, but I am unable to find any examples or get this to work.
I am using flask which runs through an init process, which the below code snippet is from. Each PID attempts to start the manager, if it's already started, it connects and gets the Proxy variables.
Specifically, I'm connecting to Jira, pulling the versions list, and storing this. Since this is an expensive operation, I store these results in the VersionsList. I also have the VersionsDict which has variables such as last fetch, fetching, etc. I would like each of these classes that require a proxy variable to only have one large proxy variable and nest the subsequent variable inside, but I can't seem to do this.
Example code I have:
from multiprocessing import Lock, set_start_method
from multiprocessing.managers import (AcquirerProxy, BaseManager, DictProxy,
ListProxy)
from os import getpid
class DataManager(BaseManager): pass
IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'
"""
If this is not set, this won't work on MacOS. https://github.com/pytest-dev/pytest-flask/issues/104
"""
set_start_method("fork")
"""
This code will not run on Windows since `fork` only runs on Unix.
https://docs.python.org/3.8/library/multiprocessing.html#contexts-and-start-methods
"""
def StartManageServer():
"""
Create a server that will share proxy variables between PIDs
"""
VersionsList = []
VersionsDict = {'last_updated': None, 'versions': []}
DataManager.register("get_VersionDict", lambda: VersionsDict, DictProxy)
DataManager.register("get_VersionList", lambda: VersionsList, ListProxy)
try:
manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
manager.get_server() # Raises OSError if a server is already running
manager.start()
log.info(f"Starting DataManager server from pid {getpid()}")
except OSError: # Already a server running
log.error(
f"DataManager server is already running, returning - PID: {getpid()}")
finally:
manager.connect()
return manager
class ManagedVariables:
manager = StartManageServer()
def _Logger(func):
def inner(cls):
print(f"PID {getpid()} is requesting {func.__name__}")
return func(cls)
return inner
@classmethod
@_Logger
def getVersions(cls):
return cls.manager.get_VersionDict()
@classmethod
@_Logger
def getVersionsList(cls):
return cls.manager.get_VersionList()
The StartManageServer class starts the server and registers the proxy variables. ManagedVariables connects to the server and hands out the proxy variables upon request. Ideally, I'm trying to find a way to put the versions list proxy variable into the version dict under "versions". When attempting to do this, the following traceback occurs.
In [2]: v = ManagedVariables.getVersions()
In [3]: vl = ManagedVariables.getVersionsList()
In [5]: v['versions'] = vl
In [6]: v['versions']
---------------------------------------------------------------------------
AuthenticationError Traceback (most recent call last)
<ipython-input-6-6a932e5f735b> in <module>
----> 1 v['versions']
<string> in __getitem__(self, *args, **kwds)
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
807
808 conn.send((self._id, methodname, args, kwds))
--> 809 kind, result = conn.recv()
810
811 if kind == '#RETURN':
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in recv(self)
254 self._check_readable()
255 buf = self._recv_bytes()
--> 256 return _ForkingPickler.loads(buf.getbuffer())
257
258 def poll(self, timeout=0.0):
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in RebuildProxy(func, token, serializer, kwds)
931 not getattr(process.current_process(), '_inheriting', False)
932 )
--> 933 return func(token, serializer, incref=incref, **kwds)
934
935 #
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in __init__(self, token, serializer, manager, authkey, exposed, incref, manager_owned)
781
782 if incref:
--> 783 self._incref()
784
785 util.register_after_fork(self, BaseProxy._after_fork)
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in _incref(self)
835 return
836
--> 837 conn = self._Client(self._token.address, authkey=self._authkey)
838 dispatch(conn, None, 'incref', (self._id,))
839 util.debug('INCREF %r', self._token.id)
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in Client(address, family, authkey)
511
512 if authkey is not None:
--> 513 answer_challenge(c, authkey)
514 deliver_challenge(c, authkey)
515
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in answer_challenge(connection, authkey)
762 response = connection.recv_bytes(256) # reject large message
763 if response != WELCOME:
--> 764 raise AuthenticationError('digest sent was rejected')
765
766 #
AuthenticationError: digest sent was rejected
I've attempted putting the list into the dictionary and this also doesn't work. Any assistance would be appreciated.
As per the discussion here, the fix here is to include this line in your StartManageServer
function:
current_process().authkey = AUTHKEY
As a sidenote, you can make your code work without setting the start method as fork
at all, you just need to make sure that the callables you register with the manager are picklable (so not lambda). Example below:
from multiprocessing import current_process
from multiprocessing.managers import (AcquirerProxy, BaseManager, DictProxy,
ListProxy)
from os import getpid
class DataManager(BaseManager): pass
IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'
# Class which we will use as callable to register typeids with
class dummy:
def __init__(self, data):
self.data = data
def __call__(self):
return self.data
def StartManageServer():
"""
Create a server that will share proxy variables between PIDs
"""
VersionsList = dummy([])
VersionsDict = dummy({'last_updated': None, 'versions': []})
DataManager.register("get_VersionDict", VersionsDict, DictProxy)
DataManager.register("get_VersionList", VersionsList, ListProxy)
try:
manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
manager.get_server() # Raises OSError if a server is already running
manager.start()
except OSError: # Already a server running
raise
finally:
current_process().authkey = AUTHKEY
# manager.connect() # **This is only required when you are connecting to server started by another manager
return manager
class ManagedVariables:
manager = None
@classmethod
def init_manager(cls):
cls.manager = StartManageServer()
def _Logger(func):
def inner(cls):
print(f"PID {getpid()} is requesting {func.__name__}")
return func(cls)
return inner
@classmethod
@_Logger
def getVersions(cls):
return cls.manager.get_VersionDict()
@classmethod
@_Logger
def getVersionsList(cls):
return cls.manager.get_VersionList()
if __name__ == '__main__':
ManagedVariables.init_manager()
v = ManagedVariables.getVersions()
vl = ManagedVariables.getVersionsList()
v['versions'] = vl
print(v['versions'])
Output
PID 135244 is requesting getVersions
PID 135244 is requesting getVersionsList
[]
Update
I assume that you are calling StartManageServer
in every process that requests the shared variable. This will make it so that each process will get a different variable because you are creating a new one every time you start a manager.
The conventional way to do this is to have a server and multiple clients. The server is the one where you start the manager and create the shared variable, and the clients connect to that manager and request the variable. In your case, here is what the server.py will look like:
from multiprocessing.managers import (BaseManager, DictProxy,
ListProxy)
class DataManager(BaseManager): pass
IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'
def StartManageServer():
"""
Create a server that will share proxy variables between PIDs
"""
VersionsList = []
VersionsDict = {'last_updated': None, 'versions': []}
# Since we will be starting server in current process, we can use unpicklable callables like lambda
DataManager.register("get_VersionDict", lambda: VersionsDict, DictProxy)
DataManager.register("get_VersionList", lambda: VersionsList, ListProxy)
try:
manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
srv = manager.get_server()
srv.serve_forever()
except OSError: # Already a server running
raise
if __name__ == '__main__':
print("Starting server")
StartManageServer()
Now once you have started the server, your clients do not need to start a new manager, they just need to connect to this one. So here is what client.py will look like (pay attention to function ConnectManageServer
):
from multiprocessing import current_process
from server import DataManager
from os import getpid
IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'
def ConnectManageServer():
# Register the typeids already registered by server (do not register a callable here! It's already done server-side)
DataManager.register("get_VersionDict")
DataManager.register("get_VersionList")
# Connect to an existing manager server
manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
manager.connect()
# Change authkey
current_process().authkey = AUTHKEY
return manager
class ManagedVariables:
manager = None
@classmethod
def init_manager(cls):
cls.manager = ConnectManageServer()
def _Logger(func):
def inner(cls):
print(f"PID {getpid()} is requesting {func.__name__}")
return func(cls)
return inner
@classmethod
@_Logger
def getVersions(cls):
return cls.manager.get_VersionDict()
@classmethod
@_Logger
def getVersionsList(cls):
return cls.manager.get_VersionList()
if __name__ == '__main__':
ManagedVariables.init_manager()
v = ManagedVariables.getVersions()
vl = ManagedVariables.getVersionsList()
vl.append(1)
v['versions'] = vl
print(v['versions'])
You can have as many of these client processes as you want. Any changes you do to the shared dictionary and list will automatically be propagated to all other processes