Search code examples
python-3.xmultiprocessingpython-multiprocessing

How to properly setup nested proxy variables with BaseManager?


Documentation mentions that you're able to setup a Manager with nested proxy variables, but I am unable to find any examples or get this to work.

I am using flask which runs through an init process, which the below code snippet is from. Each PID attempts to start the manager, if it's already started, it connects and gets the Proxy variables.

Specifically, I'm connecting to Jira, pulling the versions list, and storing this. Since this is an expensive operation, I store these results in the VersionsList. I also have the VersionsDict which has variables such as last fetch, fetching, etc. I would like each of these classes that require a proxy variable to only have one large proxy variable and nest the subsequent variable inside, but I can't seem to do this.

Example code I have:

from multiprocessing import Lock, set_start_method
from multiprocessing.managers import (AcquirerProxy, BaseManager, DictProxy,
ListProxy)
from os import getpid

class DataManager(BaseManager): pass

IP_ADDRESS = '127.0.0.1'
PORT = 50000
AUTHKEY = b'password'

"""
If this is not set, this won't work on MacOS. https://github.com/pytest-dev/pytest-flask/issues/104
"""

set_start_method("fork")

"""
This code will not run on Windows since `fork` only runs on Unix.

https://docs.python.org/3.8/library/multiprocessing.html#contexts-and-start-methods
"""

def StartManageServer():
"""
Create a server that will share proxy variables between PIDs
"""

    VersionsList = []
    VersionsDict = {'last_updated': None, 'versions': []}
    
    DataManager.register("get_VersionDict", lambda: VersionsDict, DictProxy)
    DataManager.register("get_VersionList", lambda: VersionsList, ListProxy)
    
    try:
        manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
        manager.get_server()  # Raises OSError if a server is already running
        manager.start()
        log.info(f"Starting DataManager server from pid {getpid()}")
    except OSError:  # Already a server running
        log.error(
            f"DataManager server is already running, returning - PID: {getpid()}")
    finally:
        manager.connect()
        return manager

class ManagedVariables:
    manager = StartManageServer()

    def _Logger(func):
        def inner(cls):
            print(f"PID {getpid()} is requesting {func.__name__}")
            return func(cls)
        return inner
    
    @classmethod
    @_Logger
    def getVersions(cls):
        return cls.manager.get_VersionDict()
    
    @classmethod
    @_Logger
    def getVersionsList(cls):
        return cls.manager.get_VersionList()

The StartManageServer class starts the server and registers the proxy variables. ManagedVariables connects to the server and hands out the proxy variables upon request. Ideally, I'm trying to find a way to put the versions list proxy variable into the version dict under "versions". When attempting to do this, the following traceback occurs.

In [2]: v = ManagedVariables.getVersions()
In [3]: vl = ManagedVariables.getVersionsList()
In [5]: v['versions'] = vl
In [6]: v['versions']


---------------------------------------------------------------------------
AuthenticationError                       Traceback (most recent call last)
<ipython-input-6-6a932e5f735b> in <module>
----> 1 v['versions']

<string> in __getitem__(self, *args, **kwds)

/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
    807
    808         conn.send((self._id, methodname, args, kwds))
--> 809         kind, result = conn.recv()
    810
    811         if kind == '#RETURN':

/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in recv(self)
    254         self._check_readable()
    255         buf = self._recv_bytes()
--> 256         return _ForkingPickler.loads(buf.getbuffer())
    257
    258     def poll(self, timeout=0.0):

/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in RebuildProxy(func, token, serializer, kwds)
    931         not getattr(process.current_process(), '_inheriting', False)
    932         )
--> 933     return func(token, serializer, incref=incref, **kwds)
    934
    935 #

/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in __init__(self, token, serializer, manager, authkey, exposed, incref, manager_owned)
    781
    782         if incref:
--> 783             self._incref()
    784
    785         util.register_after_fork(self, BaseProxy._after_fork)

/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py in _incref(self)
    835             return
    836
--> 837         conn = self._Client(self._token.address, authkey=self._authkey)
    838         dispatch(conn, None, 'incref', (self._id,))
    839         util.debug('INCREF %r', self._token.id)

/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in Client(address, family, authkey)
    511
    512     if authkey is not None:
--> 513         answer_challenge(c, authkey)
    514         deliver_challenge(c, authkey)
    515

/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py in answer_challenge(connection, authkey)
    762     response = connection.recv_bytes(256)        # reject large message
    763     if response != WELCOME:
--> 764         raise AuthenticationError('digest sent was rejected')
    765
    766 #

AuthenticationError: digest sent was rejected

I've attempted putting the list into the dictionary and this also doesn't work. Any assistance would be appreciated.


Solution

  • As per the discussion here, the fix here is to include this line in your StartManageServer function:

    current_process().authkey = AUTHKEY
    

    As a sidenote, you can make your code work without setting the start method as fork at all, you just need to make sure that the callables you register with the manager are picklable (so not lambda). Example below:

    from multiprocessing import current_process
    from multiprocessing.managers import (AcquirerProxy, BaseManager, DictProxy,
                                          ListProxy)
    from os import getpid
    
    
    class DataManager(BaseManager): pass
    
    
    IP_ADDRESS = '127.0.0.1'
    PORT = 50000
    AUTHKEY = b'password'
    
    
    # Class which we will use as callable to register typeids with
    class dummy:
    
        def __init__(self, data):
            self.data = data
    
        def __call__(self):
            return self.data
    
    def StartManageServer():
        """
        Create a server that will share proxy variables between PIDs
        """
    
    
        VersionsList = dummy([])
        VersionsDict = dummy({'last_updated': None, 'versions': []})
    
        DataManager.register("get_VersionDict", VersionsDict, DictProxy)
        DataManager.register("get_VersionList", VersionsList, ListProxy)
    
        try:
            manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
            manager.get_server()  # Raises OSError if a server is already running
            manager.start()
        except OSError:  # Already a server running
            raise
        finally:
            current_process().authkey = AUTHKEY
            # manager.connect()  # **This is only required when you are connecting to server started by another manager
            return manager
    
    
    class ManagedVariables:
        manager = None
    
        @classmethod
        def init_manager(cls):
            cls.manager = StartManageServer()
    
        def _Logger(func):
            def inner(cls):
                print(f"PID {getpid()} is requesting {func.__name__}")
                return func(cls)
    
            return inner
    
        @classmethod
        @_Logger
        def getVersions(cls):
            return cls.manager.get_VersionDict()
    
        @classmethod
        @_Logger
        def getVersionsList(cls):
            return cls.manager.get_VersionList()
    
    if __name__ == '__main__':
        ManagedVariables.init_manager()
        v = ManagedVariables.getVersions()
        vl = ManagedVariables.getVersionsList()
        v['versions'] = vl
        print(v['versions'])
    

    Output

    PID 135244 is requesting getVersions
    PID 135244 is requesting getVersionsList
    []
    

    Update

    I assume that you are calling StartManageServer in every process that requests the shared variable. This will make it so that each process will get a different variable because you are creating a new one every time you start a manager.

    The conventional way to do this is to have a server and multiple clients. The server is the one where you start the manager and create the shared variable, and the clients connect to that manager and request the variable. In your case, here is what the server.py will look like:

    from multiprocessing.managers import (BaseManager, DictProxy,
                                          ListProxy)
    
    
    class DataManager(BaseManager): pass
    
    
    IP_ADDRESS = '127.0.0.1'
    PORT = 50000
    AUTHKEY = b'password'
    
    def StartManageServer():
        """
        Create a server that will share proxy variables between PIDs
        """
    
        VersionsList = []
        VersionsDict = {'last_updated': None, 'versions': []}
    
        # Since we will be starting server in current process, we can use unpicklable callables like lambda
        DataManager.register("get_VersionDict", lambda: VersionsDict, DictProxy)
        DataManager.register("get_VersionList", lambda: VersionsList, ListProxy)
    
        try:
            manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
            srv = manager.get_server()
            srv.serve_forever()
        except OSError:  # Already a server running
            raise
    
    
    if __name__ == '__main__':
        print("Starting server")
        StartManageServer()
    

    Now once you have started the server, your clients do not need to start a new manager, they just need to connect to this one. So here is what client.py will look like (pay attention to function ConnectManageServer):

    from multiprocessing import current_process
    from server import DataManager
    from os import getpid
    
    
    IP_ADDRESS = '127.0.0.1'
    PORT = 50000
    AUTHKEY = b'password'
    
    
    def ConnectManageServer():
    
        # Register the typeids already registered by server (do not register a callable here! It's already done server-side)
        DataManager.register("get_VersionDict")
        DataManager.register("get_VersionList")
    
        # Connect to an existing manager server
        manager = DataManager(address=(IP_ADDRESS, PORT), authkey=AUTHKEY)
        manager.connect()
    
        # Change authkey
        current_process().authkey = AUTHKEY
        return manager
    
    
    class ManagedVariables:
        manager = None
    
        @classmethod
        def init_manager(cls):
            cls.manager = ConnectManageServer()
    
        def _Logger(func):
            def inner(cls):
                print(f"PID {getpid()} is requesting {func.__name__}")
                return func(cls)
    
            return inner
    
        @classmethod
        @_Logger
        def getVersions(cls):
            return cls.manager.get_VersionDict()
    
        @classmethod
        @_Logger
        def getVersionsList(cls):
            return cls.manager.get_VersionList()
    
    if __name__ == '__main__':
        ManagedVariables.init_manager()
        v = ManagedVariables.getVersions()
        vl = ManagedVariables.getVersionsList()
        vl.append(1)
        v['versions'] = vl
        print(v['versions'])
    

    You can have as many of these client processes as you want. Any changes you do to the shared dictionary and list will automatically be propagated to all other processes