I am working with both the asyncio
and the multiprocessing
library to run two processes, each with one server instance listening on different ports for incoming messages.
To identify each client, I want to share a dict
between the two processes to update the list of known clients. To achieve this, I decided to use a Tuple[StreamReader, StreamWriter]
lookup key which is assigned a Client
object for this connection.
However, as soon as I insert or simply access the shared dict, the program crashes with the following error message:
Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
if not await self.handle_message(reader, writer, buffer):
File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
client = self.syncmanager.get_api_client((reader, writer))
File "<string>", line 2, in get_api_client
File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'
Naturally I looked up the error message and found this question, but I don't really understand what the reason is here. As far as I understand, the reason for this crash is that StreamReader
and StreamWriter
cannot be pickled/serialized in order to be shared between processes. If that is in fact the reason, is there a way to pickle them, maybe by patching the reducer function to instead use a different pickler?
I managed to find a workaround while also keeping the asyncio
and multiprocessing
libraries without any other libraries.
First, since the StreamReader
and StreamWriter
objects are not pickable, I am forced to use a socket
. This is easily achievable with a simple function:
def get_socket(writer: StreamWriter):
fileno = writer.get_extra_info('socket').fileno()
return socket.fromfd(fileno, AddressFamily.AF_INET, socket.SOCK_STREAM)
The socket is inserted into the shared object (e.g. Manager().dict()
or even a custom class, which you have to register via a custom BaseManager
instance). Now, since the application is build on asyncio
and makes use of the streams provided by the library, we can easily convert the socket
back to a pair of StreamReader
and StreamWriter
via:
node_reader, node_writer = await asyncio.open_connection(sock=self.node_sock)
node_writer.write(mesg_text)
await node_writer.drain()
Where self.node_sock
is the socket
instance that was passed through the shared object.