Search code examples
pythonmultiprocessingpickle

_pickle.PicklingError: Can't pickle <function foo at 0x7f13530775b0>: attribute lookup foo on __main__ failed


I have a wierd problem with multiprocessing when trying to implement a communication api, when I try to store a function in a Manager.dict() I get the error that's in the title, after some research I found that you can only pickle top-level functions (with no nested functions), but the function I'm trying to pickle is a top-level function so I don't really understand what is happening. I came up with this minimal reproductible example and I managed to get the same error:

import multiprocessing as mp
from typing import Callable, Any
from uuid import uuid4
from time import sleep

def defaultInit(self): pass
def defaultLoop(self): pass

class Event():
  def __init__(self, name: str, callback: Callable[..., Any]):
    super().__init__()
    self._id = uuid4()
    self._name = name
    self._trigger = False
    self._callback = callback
    self._args = None

  def _updateEvents(self, events):
    for i, event in enumerate(events[self._name]):
      if event._id == self._id:
        tmp = events[self._name]
        tmp[i] = self
        events[self._name] = tmp
        break

  def execIfTriggered(self, owner) -> None:
    if self._trigger:
      self._trigger = False
      self._updateEvents(owner.events)
      self._callback(owner, *self._args)

  def trigger(self, owner, *args) -> None:
    self._args = args
    self._trigger = True
    self._updateEvents(owner.events)
    return self


class Target():
  def __init__(self, events):
    self._init = defaultInit
    self._loop = defaultLoop
    self._events = events
  
  def event(self, callback):
    name = callback.__name__
    return self.on(name, callback)

  def on(self, eventName, callback):
    if eventName not in self._events:
      self._events[eventName] = [Event(eventName, callback)]
    else:
      self._events[eventName] = self._events[eventName] + [Event(eventName, callback)]
    return callback

  def emit(self, event, *args):
    for ev in self._events[event]:
      ev.trigger(self, *args)

  def init(self, callback):
    self._init = callback

  def loop(self, callback):
    self._loop = callback

  def processEvents(self):
    for events in self._events.values():
      for event in events:
        event.execIfTriggered(self)

def run(target):
  target._init()
  while True:
    target.processEvents()
    target._loop()


if __name__ == '__main__':
  mgr = mp.Manager()
  t = Target(mgr.dict())
  proc = mp.Process(target=run, args=(t,))

  @t.init
  def fooInit(target):
    print('fooInit')

  @t.event
  def foo(target):
    print('fooEvent')

  proc.start()
  sleep(1)
  t.emit('foo')
  sleep(1)
  proc.join()

Full error output:

Traceback (most recent call last):
  File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 88, in <module>
    def foo(target):
  File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 47, in event
    return self.on(name, callback)
  File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 51, in on
    self._events[eventName] = [Event(eventName, callback)]
  File "<string>", line 2, in __setitem__
  File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function foo at 0x7f13530775b0>: attribute lookup foo on __main__ failed

A thing I noticed that is even weirder is that when I use the event function in a normal way instead of the decorator way, it outputs a different error message:

# if instead of this
  @t.event
  def foo(target):
    print('fooEvent')
# I do this
  def foo(target):
    print('fooEvent')
  t.event(foo)

I get this error message in that case:

Traceback (most recent call last):
  File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 89, in <module>
    t.event(foo)
  File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 47, in event
    return self.on(name, callback)
  File "/home/lsuardi/smc/moscau/Acquisition_python/test.py", line 51, in on
    self._events[eventName] = [Event(eventName, callback)]
  File "<string>", line 2, in __setitem__
  File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/managers.py", line 833, in _callmethod
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/managers.py", line 253, in serve_client
    request = recv()
  File "/home/lsuardi/miniconda3/envs/SMC-lsuardi/lib/python3.10/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
AttributeError: Can't get attribute 'foo' on <module '__main__' from '/home/lsuardi/smc/moscau/Acquisition_python/test.py'>
---------------------------------------------------------------------------

Which is roughly similar but not the same. Could someone explain me what is happening and how to prevent that ? The error seems to come from the fact that I'm using a Manager.dict() but I don't know how to share the functions between the parent and child process otherwise.


Solution

  • The main issue here is that by adding a function to the shared dictionary you are pickling the function and sending it to the manager which resides in a separate process of its own. Pickling a function does not serialize the actual code, but rather serializes a reference which can be imported on "un-pickling". Putting a function inside if __name__ == "__main__": specifically prevents it from being defined on import, and only allows it to be defined when the script is run as the "main" file. This conflicts with your desire to create your class instance t inside the import guard (which is generally a good idea. Don't create state on import; only definitions) while at the same time using the function decorator syntax to pass functions to the instance. The solution is to give up on the decorator syntax and just pass the functions to your instance normally. After that there were a few other issues I had to fix to get the example running which are detailed in a few comments:

    import multiprocessing as mp
    from typing import Callable, Any
    from uuid import uuid4
    from time import sleep
    
    def defaultInit(self): pass
    def defaultLoop(self): pass
    
    class Event():
      def __init__(self, name: str, callback: Callable[..., Any]):
        super().__init__()
        self._id = uuid4()
        self._name = name
        self._trigger = False
        self._callback = callback
        self._args = None
    
      def _updateEvents(self, events):
        for i, event in enumerate(events[self._name]):
          if event._id == self._id:
            tmp = events[self._name]
            tmp[i] = self
            events[self._name] = tmp
            break
    
      def execIfTriggered(self, owner) -> None:
        if self._trigger:
          self._trigger = False
          self._updateEvents(owner._events) #typo _events not events
          self._callback(owner, *self._args)
    
      def trigger(self, owner, *args) -> None:
        self._args = args
        self._trigger = True
        self._updateEvents(owner._events) #typo _events not events
        return self
    
    
    class Target():
      def __init__(self, events):
        self._init = defaultInit
        self._loop = defaultLoop
        self._events = events
      
      def event(self, callback):
        name = callback.__name__
        return self.on(name, callback)
    
      def on(self, eventName, callback):
        if eventName not in self._events:
          self._events[eventName] = [Event(eventName, callback)]
        else:
          self._events[eventName] = self._events[eventName] + [Event(eventName, callback)]
        return callback
    
      def emit(self, event, *args):
        for ev in self._events[event]:
          ev.trigger(self, *args)
    
      def init(self, callback):
        self._init = callback
    
      def loop(self, callback):
        self._loop = callback
    
      def processEvents(self):
        for events in self._events.values():
          for event in events:
            event.execIfTriggered(self)
    
    def run(target):
      target._init()
      while True:
        target.processEvents()
        target._loop(target) #missing "self" required by default loop
    
    def fooInit(): #removed unused args which were not used nor provided in run: target._init()
      print('fooInit')
    
    def foo(target: Target, *args): #edited args to match function signature implied by emit
      print(f'fooEvent {args}') #did something with args to demonstrate 
    
    if __name__ == '__main__':
      mgr = mp.Manager()
      t = Target(mgr.dict())
      proc = mp.Process(target=run, args=(t,))
    
      t.init(fooInit)
    
      t.event(foo)
    
      proc.start()
      sleep(1)
      t.emit('foo', 1,2,3) #added args for demonstration
      sleep(1)
      proc.join()