Search code examples
pythonmultiprocessing

Python class missing attribute that was initialized?


I am trying to create a class that enables counting based off of the multiprocessing Queue:

import multiprocessing
from multiprocessing import Value
from multiprocessing.queues import Queue

class SharedCounter(object):
    """ A synchronized shared counter.

    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.

    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/

    """

    def __init__(self, n = 0):
        self.count = Value('i', n)

    def increment(self, n = 1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value


class CounterQueue(Queue):
    """ A portable implementation of multiprocessing.Queue.

    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().

    """

    def __init__(self, *args, **kwargs):
        self.size = SharedCounter(0)
        super(CounterQueue, self).__init__(ctx=multiprocessing.get_context(), *args, **kwargs)

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super(CounterQueue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
        self.size.increment(-1)
        return super(CounterQueue, self).get(*args, **kwargs)

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self.size.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()

    def clear(self):
        """ Remove all elements from the Queue. """
        while not self.empty():
            self.get()

However, it seems like when I try and pass this object as an argument into another process,

    for i in range(len(multiples)):
        res_queues.append(CounterQueue())
    
    process = mp.Process(name="test", 
                                    target=function,
                                      args=(res_queues))
    
    process.daemon = True

    process.start()

I get an AttributeError when calling put: AttributeError: 'CounterQueue' object has no attribute 'size'. However, I've confirmed the code is correct since the following code executes without issue:

>>> from python.multiprocessing.queue import CounterQueue
>>> a = CounterQueue()
>>> a.put(1)
>>> a.qsize()
1

I'm wondering if I'm missing something with respect to Python specifics here?


Solution

  • I will reason about it, and not sure if I can get you an answer: what is likely happening is that in order for multiprocessing.Queue() to work properly - i.e., act as a Queue endpoint when passed to another process, it has to customize its serialization/de-serialization. And then, what you get there is that the de-serialization of a mp.Queue subclass won't preserve the instance attributes of that class.

    (Fact - here is the code for mp.Queue __getstate__:

    
    File ~/.pyenv/versions/3.12.0/lib/python3.12/multiprocessing/queues.py:58, in Queue.__getstate__(self)
         57 def __getstate__(self):
    ---> 58     context.assert_spawning(self)
         59     return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
         60             self._rlock, self._wlock, self._sem, self._opid)
    
    

    (sorry, I picked it from a stacktrace, hence the noise) - but as you can see, it just serializaes that fixed set of attributes.

    I guess the most transparent workaround for your case is, instead of inheritance, use association: your queue class should "have" a multiprocessing.Queue as an attribute, as a sibling to the .size attribute, and then you have to proxy all Queue methods from your class to the underlying one.