I have a wierd problem with multiprocessing when trying to implement a communication api, when I try to store a function in a Manager.dict() I get the error that's in the title, after some research I found that you can only pickle top-level functions (with no nested functions), but the function I'm trying to pickle is a top-level function so I don't really understand what is happening. I came up with this minimal reproductible example and I managed to get the same error:
import multiprocessing as mp
from typing import Callable, Any
from uuid import uuid4
from time import sleep
def defaultInit(self): pass
def defaultLoop(self): pass
class Event():
def __init__(self, name: str, callback: Callable[..., Any]):
super().__init__()
self._id = uuid4()
self._name = name
self._trigger = False
self._callback = callback
self._args = None
def _updateEvents(self, events):
for i, event in enumerate(events[self._name]):
if event._id == self._id:
tmp = events[self._name]
tmp[i] = self
events[self._name] = tmp
break
def execIfTriggered(self, owner) -> None:
if self._trigger:
self._trigger = False
self._updateEvents(owner.events)
self._callback(owner, *self._args)
def trigger(self, owner, *args) -> None:
self._args = args
self._trigger = True
self._updateEvents(owner.events)
return self
class Target():
def __init__(self, events):
self._init = defaultInit
self._loop = defaultLoop
self._events = events
def event(self, callback):
name = callback.__name__
return self.on(name, callback)
def on(self, eventName, callback):
if eventName not in self._events:
self._events[eventName] = [Event(eventName, callback)]
else:
self._events[eventName] = self._events[eventName] + [Event(eventName, callback)]
return callback
def emit(self, event, *args):
for ev in self._events[event]:
ev.trigger(self, *args)
def init(self, callback):
self._init = callback
def loop(self, callback):
self._loop = callback
def processEvents(self):
for events in self._events.values():
for event in events:
event.execIfTriggered(self)
def run(target):
target._init()
while True:
target.processEvents()
target._loop()
if __name__ == '__main__':
mgr = mp.Manager()
t = Target(mgr.dict())
proc = mp.Process(target=run, args=(t,))
@t.init
def fooInit(target):
print('fooInit')
@t.event
def foo(target):
print('fooEvent')
proc.start()
sleep(1)
t.emit('foo')
sleep(1)
proc.join()
Full error output:
Traceback (most recent call last):
File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 88, in <module>
def foo(target):
File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 47, in event
return self.on(name, callback)
File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 51, in on
self._events[eventName] = [Event(eventName, callback)]
File "<string>", line 2, in __setitem__
File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function foo at 0x7f13530775b0>: attribute lookup foo on __main__ failed
A thing I noticed that is even weirder is that when I use the event function in a normal way instead of the decorator way, it outputs a different error message:
# if instead of this
@t.event
def foo(target):
print('fooEvent')
# I do this
def foo(target):
print('fooEvent')
t.event(foo)
I get this error message in that case:
Traceback (most recent call last):
File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 89, in <module>
t.event(foo)
File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 47, in event
return self.on(name, callback)
File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 51, in on
self._events[eventName] = [Event(eventName, callback)]
File "<string>", line 2, in __setitem__
File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/managers.py", line 833, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/managers.py", line 253, in serve_client
request = recv()
File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
AttributeError: Can't get attribute 'foo' on <module '__main__' from '/home/lsuardi/smc/moscau/Acquisition_python/test.py'>
---------------------------------------------------------------------------
Which is roughly similar but not the same. Could someone explain me what is happening and how to prevent that ? The error seems to come from the fact that I'm using a Manager.dict() but I don't know how to share the functions between the parent and child process otherwise.
The main issue here is that by adding a function to the shared dictionary you are pickling the function and sending it to the manager which resides in a separate process of its own. Pickling a function does not serialize the actual code, but rather serializes a reference which can be imported on "un-pickling". Putting a function inside if __name__ == "__main__":
specifically prevents it from being defined on import, and only allows it to be defined when the script is run as the "main" file. This conflicts with your desire to create your class instance t
inside the import guard (which is generally a good idea. Don't create state on import; only definitions) while at the same time using the function decorator syntax to pass functions to the instance. The solution is to give up on the decorator syntax and just pass the functions to your instance normally. After that there were a few other issues I had to fix to get the example running which are detailed in a few comments:
import multiprocessing as mp
from typing import Callable, Any
from uuid import uuid4
from time import sleep
def defaultInit(self): pass
def defaultLoop(self): pass
class Event():
def __init__(self, name: str, callback: Callable[..., Any]):
super().__init__()
self._id = uuid4()
self._name = name
self._trigger = False
self._callback = callback
self._args = None
def _updateEvents(self, events):
for i, event in enumerate(events[self._name]):
if event._id == self._id:
tmp = events[self._name]
tmp[i] = self
events[self._name] = tmp
break
def execIfTriggered(self, owner) -> None:
if self._trigger:
self._trigger = False
self._updateEvents(owner._events) #typo _events not events
self._callback(owner, *self._args)
def trigger(self, owner, *args) -> None:
self._args = args
self._trigger = True
self._updateEvents(owner._events) #typo _events not events
return self
class Target():
def __init__(self, events):
self._init = defaultInit
self._loop = defaultLoop
self._events = events
def event(self, callback):
name = callback.__name__
return self.on(name, callback)
def on(self, eventName, callback):
if eventName not in self._events:
self._events[eventName] = [Event(eventName, callback)]
else:
self._events[eventName] = self._events[eventName] + [Event(eventName, callback)]
return callback
def emit(self, event, *args):
for ev in self._events[event]:
ev.trigger(self, *args)
def init(self, callback):
self._init = callback
def loop(self, callback):
self._loop = callback
def processEvents(self):
for events in self._events.values():
for event in events:
event.execIfTriggered(self)
def run(target):
target._init()
while True:
target.processEvents()
target._loop(target) #missing "self" required by default loop
def fooInit(): #removed unused args which were not used nor provided in run: target._init()
print('fooInit')
def foo(target: Target, *args): #edited args to match function signature implied by emit
print(f'fooEvent {args}') #did something with args to demonstrate
if __name__ == '__main__':
mgr = mp.Manager()
t = Target(mgr.dict())
proc = mp.Process(target=run, args=(t,))
t.init(fooInit)
t.event(foo)
proc.start()
sleep(1)
t.emit('foo', 1,2,3) #added args for demonstration
sleep(1)
proc.join()