Search code examples
pythonpicklepython-multiprocessing

Change default multiprocessing unpickler class


I have a multiprocessing program on Device A which uses a queue and a SyncManager to make this accessible over the network. The queue stores a custom class from a module on the device which gets automatically pickled by the multiprocessing package as module.class.

On another device reading the queue via a SyncManager, I have the same module as part of a package instead of top-level as it was on Device A. This means I get a ModuleNotFoundError when I attempt to read an item from the queue as the unpickler doesn't know the module is now package.module.

I've seen this work-around which uses a new class based on pickler.Unpicker and seems the least hacky and extensible: https://stackoverflow.com/a/53327348/5683049 However, I don't know how to specify the multiprocessing unpickler class to use.

I see this can be done for the reducer class so I assume there is a way to also set the unpickler?


Solution

  • Using a mixture of:

    I was able to get this working using code similar to the following:

    from multiprocessing.reduction import ForkingPickler, AbstractReducer
    import pickle
    import io
    
    multiprocessing.context._default_context.reducer = MyPickleReducer()
    
    class RenameUnpickler(pickle.Unpickler):
        def find_class(self, module, name):
            renamed_module = module
            if module == "old_module_name":
                renamed_module = "new_package.module_name"
            return super(RenameUnpickler, self).find_class(renamed_module, name)
    
    class MyForkingPickler(ForkingPickler):
    
        # Method signature from pickle._loads       
        def loads(self, /, *, fix_imports=True, encoding="ASCII", errors="strict",
                buffers=None):
            if isinstance(s, str):
                raise TypeError("Can't load pickle from unicode string")
            file = io.BytesIO(s)
            return RenameUnpickler(file, fix_imports=fix_imports, buffers=buffers,
                            encoding=encoding, errors=errors).load()
        
    class MyPickleReducer(AbstractReducer):
        ForkingPickler = MyForkingPickler
        register = MyForkingPickler.register
    

    This could be useful if you want to further override how the unpickling is performed, but in my original case it is probably just easier to redirect the module using:

    from new_package import module_name
    sys.modules['old_module_name'] = module_name