Search code examples
pythonpython-3.xpicklepython-multiprocessingmultiprocessing-manager

Sharing asyncio objects between processes


I am working with both the asyncio and the multiprocessing library to run two processes, each with one server instance listening on different ports for incoming messages.

To identify each client, I want to share a dict between the two processes to update the list of known clients. To achieve this, I decided to use a Tuple[StreamReader, StreamWriter] lookup key which is assigned a Client object for this connection.

However, as soon as I insert or simply access the shared dict, the program crashes with the following error message:

Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
  File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
    if not await self.handle_message(reader, writer, buffer):
  File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
    client = self.syncmanager.get_api_client((reader, writer))
  File "<string>", line 2, in get_api_client
  File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

Naturally I looked up the error message and found this question, but I don't really understand what the reason is here. As far as I understand, the reason for this crash is that StreamReader and StreamWriter cannot be pickled/serialized in order to be shared between processes. If that is in fact the reason, is there a way to pickle them, maybe by patching the reducer function to instead use a different pickler?


Solution

  • I managed to find a workaround while also keeping the asyncio and multiprocessing libraries without any other libraries.

    First, since the StreamReader and StreamWriter objects are not pickable, I am forced to use a socket. This is easily achievable with a simple function:

    def get_socket(writer: StreamWriter):
        fileno = writer.get_extra_info('socket').fileno()
        return socket.fromfd(fileno, AddressFamily.AF_INET, socket.SOCK_STREAM)
    

    The socket is inserted into the shared object (e.g. Manager().dict() or even a custom class, which you have to register via a custom BaseManager instance). Now, since the application is build on asyncio and makes use of the streams provided by the library, we can easily convert the socket back to a pair of StreamReader and StreamWriter via:

    node_reader, node_writer = await asyncio.open_connection(sock=self.node_sock)
    node_writer.write(mesg_text)
    await node_writer.drain()
    

    Where self.node_sock is the socket instance that was passed through the shared object.