Search code examples

Subclassing multiprocessing.queues.Queue

I had this working previously, but after a recent full os update, likely updating my python3 install, what I had working now isn't. I'm basically trying to subclass the multiprocessing.queue.Queue class, but I keep getting errors that methods and properties are missing. Here's what used to work:

class q_class(multiprocessing.queues.Queue):
    def __init__(self):
        # ... my own init stuff goes here
        super(q_class, self).__init__(ctx=multiprocessing.get_context())
    def put(self,message):
        # ... my own put stuff goes here
        super(q_class, self).put(message)
    def get(self):
        # ... my own get stuff goes here
        super(q_class, self).get()
    def __getstate__(self):
        multiprocessing.context.assert_spawning(self) #is this necessary?
        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                self._rlock, self._wlock, self._sem, self._opid,
                self.report_bottlenecks, self.bottleneck_time, self.debug)
    def __setstate__(self, state):
        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
         self._rlock, self._wlock, self._sem, self._opid,
         self.report_bottlenecks, self.bottleneck_time,self.debug) = state
        #self._after_fork() #is this necessary?

But when I try to use the .get() method I get an error that the _poll() method doesn't exist. Peeking at the SimpleQueue subclass creation that is in the multiprocessing source, I see that there at least it was necessary to explicitly define the _poll() method, so I tried that (just copied the line linked above to mine), then got an error about _poll() needing a timeout value, which is weird because I thought the reader class's _poll() had a default as timeout=None, but whatever, I solved that by defining a custom method:

    def _poll(self,timeout=.01):

Which works to get past the timeout error, only now when I try to put a message on the queue I get an error that there is no _closed attribute. Looking at the multiprocessing.queues.Queue source again, this attribute should get created when self._reset() is called within its __init__(), which should in turn have been called at the end of my subclass' __init__(), but clearly it hasn't, so I'm now thinking I must simply misunderstand how to subclass in the first place (or, at least, my understanding has become outdated). Any help would be greatly appreciated!


  • Well, you see that final line that was commented-out?

            #self._after_fork() #is this necessary?

    Yeah, it was necessary. Uncommenting makes the code work. 🤦