Search code examples
pythonmultiprocessingpickle

Can't pickle <class '__main__.JobQueueManager'>


I am encountering a picklability problem in this code (also attached below). I have read relevant posts [1] [2] but I can not find usefull correspodences. Could you please give an explanation or solution of this error?

Below these is the part of the code that returns the error:

pickle.PicklingError: Can't pickle <class '__main__.JobQueueManager'>: it's not found as __main__.JobQueueManager

Thanks!

def make_server_manager(port, authkey):

    job_q = Queue.Queue()
    result_q = Queue.Queue()

    class JobQueueManager(SyncManager):
        pass

    JobQueueManager.register('get_job_q', callable=lambda: job_q)
    JobQueueManager.register('get_result_q', callable=lambda: result_q)

    manager = JobQueueManager(address=('', port), authkey=authkey)
    manager.start()
    print 'Server started at port %s' % port
    return manager

PS: Python 2.7.7, Win 7


Solution

  • As best as I can tell, to make this pattern work on Windows, you need to create a picklable queue.Queue. You can do that by creating a child class of Queue that defines __setstate__ and __getstate__, and have it only pickle the pieces of state that we actually need to send between processes, and leave the other stuff (unpicklable internal locks) out.

    The other changes we need to make are to move the custom Manager class definitions to the top-level, and to not use lambda functions as the argument to callable. Instead, we use a partial and a top-level function, because that can be pickled. Here's the final code:

    import sys
    from multiprocessing.managers import SyncManager
    from functools import partial
    import multiprocessing
    from Queue import Queue as _Queue
    
    class Queue(_Queue):
        """ A picklable queue. """   
        def __getstate__(self):
            # Only pickle the state we care about
            return (self.maxsize, self.queue, self.unfinished_tasks)
    
        def __setstate__(self, state):
            # Re-initialize the object, then overwrite the default state with
            # our pickled state.
            Queue.__init__(self)
            self.maxsize = state[0]
            self.queue = state[1]
            self.unfinished_tasks = state[2]
    
    
    def get_q(q):
        return q
    
    class JobQueueManager(SyncManager):
        pass
    
    
    def make_server_manager(port, authkey):
        job_q = Queue()
        result_q = Queue()
    
        job_q.put("hey")
        JobQueueManager.register('get_job_q', callable=partial(get_q, job_q))
        JobQueueManager.register('get_result_q', callable=partial(get_q, result_q))
    
        manager = JobQueueManager(address=('', port), authkey=authkey)
        #manager.start()
        print('Server started at port %s' % port)
        return manager
    
    def make_client_manager(port, authkey):
        JobQueueManager.register('get_job_q')
        JobQueueManager.register('get_result_q')
        manager = JobQueueManager(address=('localhost', port), authkey=authkey)
        manager.connect()
        queue = manager.get_job_q()
        print("got queue {}".format(queue))
        print(queue.get_nowait())
    
    if __name__ == "__main__":
        if len(sys.argv) > 1 and sys.argv[1] == "--client":
            make_client_manager(50000, 'abcdefg')
        else:
            manager = make_server_manager(50000, "abcdefg")
            server = manager.get_server()
            server.serve_forever()