Search code examples
python-3.xmultiprocessingpython-multiprocessing

Python multiprocessing manager and proxy; why does my custom class not share over a network?


So I am trying to share access to a custom class object over a network by having a host machine serve it, and multiple clients connect to and update it. It seems that the standard multiprocessing library has managers and proxies for just this usage case.

I believe from reading the multiprocessing docs (and other posts here) that on the host side I need a custom manager derived from the multiprocessing BaseManager class then register my custom class with a custom proxy derived from the multiprocessing BaseProxy class. I believe on the client side I need to connect to the manager and interact with my custom class using a proxy.

The issue I am having is that attributes that change on the host version of my custom class cannot be seen on the clients and vice versa. It seems I have done something wrong with the implementation or misunderstood the docs/source. Can someone help provide a working example and explain the solution?

Here is my minimal example.

Run manager in one terminal with:

>> python -i manager.py

from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType

IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'

# shared class
class Foo:
    """my custom shared class"""
    def __init__(self):
        self.x = None

    def get_x(self):
        return self.x

    def set_x(self, value):
        self.x = value

#  proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))

class FooProxy(FooProxyBase):
    """shared class proxy"""

    def get_x(self):
        return self._callmethod('get_x')

    def set_x(self, value):
        return self._callmethod('set_x', (value,))

# custom shared class manager
class MyBaseManager(BaseManager):
    pass

# manager, run manager server for shared class and set data
print(f'manager running on proc {os.getpid()}')

MyBaseManager.register("Foo", Foo, FooProxy)
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.start()

print(f'manager server running on proc {m._process.pid}')
print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')

# get copy of managed class proxy and set value to 10
f = m.Foo()
print(f'x={f.get_x()} => should be None')
f.set_x(10) # set x value to 10
print(f'x={f.get_x()} => should be 10')

Run client(s) in other terminals(s)

 >> python -i client.py
from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType

IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'

#  proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))

class FooProxy(FooProxyBase):
    """shared class proxy"""

    def get_x(self):
        return self._callmethod('get_x')

    def set_x(self, value):
        return self._callmethod('set_x', (value,))

# custom shared class manager
class MyBaseManager(BaseManager):
    pass

# client, connect to manager server and get data from shared class
print(f'client running on proc {os.getpid()}')

MyBaseManager.register("Foo", None, FooProxy)
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.connect()

print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')

# get copy of managed class proxy and get value, should be 10
f = m.Foo()
print(f'x={f.get_x()} => should be 10')

You will see that the clients(s) return x=None when it should be 10. I get no obvious exceptions raised and have tried on Ubuntu 18 and OSX and get the same behavior.

Please note I’m limited to python 3.6 and use of the standard library on my production system. I have also successfully tried other examples in the multiprocessing docs so believe it not to be a computer/networking issue.

Thanks


Solution

  • The client has a different instance of a shared Foo. You can pass that instance to other processes and see that it is working correctly. Below I've passed it to two other processes: one prints current value and changes it, the other waits a moment, then prints the value updated by the first worker:

    import multiprocessing as mp
    from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
    import os
    import time
    
    IP = 'localhost'
    PORT = 50000
    KEY = b'abracadabra'
    
    #  proxy class
    FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))
    
    class FooProxy(FooProxyBase):
        """shared class proxy"""
    
        def get_x(self):
            return self._callmethod('get_x')
    
        def set_x(self, value):
            return self._callmethod('set_x', (value,))
    
    # custom shared class manager
    class MyBaseManager(BaseManager):
        pass
    
    def worker1(f):
        print(f'worker pid={os.getpid()} x={f.get_x()}')
        f.set_x(5)
    
    def worker2(f):
        time.sleep(1)
        print(f'worker pid={os.getpid()} x={f.get_x()}')
    
    if __name__ == '__main__':
        # client, connect to manager server and get data from shared class
        print(f'client running on proc {os.getpid()}')
    
        MyBaseManager.register("Foo", None, FooProxy)
        m = MyBaseManager(address=(IP, PORT), authkey=KEY)
        m.connect()
    
        print(f'(proxy) {str(m)}')
        print(f'(referant) {repr(m)}')
    
        # get copy of managed class proxy and get value, should be 10
        f = m.Foo()
        f.set_x(7)
        print(f'client x={f.get_x()}')
        mp.Process(target=worker1,args=(f,)).start()
        mp.Process(target=worker2,args=(f,)).start()
    

    Output from py -i client.py:

    client running on proc 11332
    (proxy) <__main__.MyBaseManager object at 0x000001EB9AC0AFA0>
    (referant) <__main__.MyBaseManager object at 0x000001EB9AC0AFA0>
    client x=7
    >>> worker pid=2560 x=7
    worker pid=11444 x=5
    

    If you want a singleton Foo, you could expose a get_foo function that gets a common global Foo:

    manager.py:

    from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
    import os
    
    IP = 'localhost'
    PORT = 50000
    KEY = b'abracadabra'
    
    # shared class
    class Foo:
        """my custom shared class"""
        def __init__(self):
            self.x = None
    
        def get_x(self):
            return self.x
    
        def set_x(self, value):
            self.x = value
    
    #  proxy class
    FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))
    
    class FooProxy(FooProxyBase):
        """shared class proxy"""
    
        def get_x(self):
            return self._callmethod('get_x')
    
        def set_x(self, value):
            return self._callmethod('set_x', (value,))
    
    ##  Global Foo and function to get the instance.
    global_foo = Foo()
    
    def get_foo():
        return global_foo
    
    # custom shared class manager
    class MyBaseManager(BaseManager):
        pass
    
    if __name__ == '__main__':
        # manager, run manager server for shared class and set data
        print(f'manager running on proc {os.getpid()}')
    
        MyBaseManager.register("Foo", Foo, FooProxy)
        MyBaseManager.register("get_foo", get_foo)
        m = MyBaseManager(address=(IP, PORT), authkey=KEY)
        m.start()
    
        print(f'manager server running on proc {m._process.pid}')
        print(f'(proxy) {str(m)}')
        print(f'(referant) {repr(m)}')
    
        # get global instance and set value to 10
        f = m.get_foo()
        print(f'x={f.get_x()} => should be None')
        f.set_x(10) # set x value to 10
        print(f'x={f.get_x()} => should be 10')
    

    client.py:

    from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
    import os
    
    IP = 'localhost'
    PORT = 50000
    KEY = b'abracadabra'
    
    #  proxy class
    FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))
    
    class FooProxy(FooProxyBase):
        """shared class proxy"""
    
        def get_x(self):
            return self._callmethod('get_x')
    
        def set_x(self, value):
            return self._callmethod('set_x', (value,))
    
    # custom shared class manager
    class MyBaseManager(BaseManager):
        pass
    
    # client, connect to manager server and get data from shared class
    print(f'client running on proc {os.getpid()}')
    
    MyBaseManager.register("Foo", None, FooProxy)
    MyBaseManager.register("get_foo")
    m = MyBaseManager(address=(IP, PORT), authkey=KEY)
    m.connect()
    
    print(f'(proxy) {str(m)}')
    print(f'(referant) {repr(m)}')
    
    # get global instance and get value, should be 10
    f = m.get_foo()
    print(f'x={f.get_x()} => should be 10')
    

    Output of client:

    client running on proc 7180
    (proxy) <__main__.MyBaseManager object at 0x000001A3F0267610>
    (referant) <__main__.MyBaseManager object at 0x000001A3F0267610>
    x=10 => should be 10