I am trying to create a class that enables counting based off of the multiprocessing
Queue:
import multiprocessing
from multiprocessing import Value
from multiprocessing.queues import Queue
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n = 0):
self.count = Value('i', n)
def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
class CounterQueue(Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
self.size = SharedCounter(0)
super(CounterQueue, self).__init__(ctx=multiprocessing.get_context(), *args, **kwargs)
def put(self, *args, **kwargs):
self.size.increment(1)
super(CounterQueue, self).put(*args, **kwargs)
def get(self, *args, **kwargs):
self.size.increment(-1)
return super(CounterQueue, self).get(*args, **kwargs)
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
def clear(self):
""" Remove all elements from the Queue. """
while not self.empty():
self.get()
However, it seems like when I try and pass this object as an argument into another process,
for i in range(len(multiples)):
res_queues.append(CounterQueue())
process = mp.Process(name="test",
target=function,
args=(res_queues))
process.daemon = True
process.start()
I get an AttributeError when calling put
: AttributeError: 'CounterQueue' object has no attribute 'size'
. However, I've confirmed the code is correct since the following code executes without issue:
>>> from python.multiprocessing.queue import CounterQueue
>>> a = CounterQueue()
>>> a.put(1)
>>> a.qsize()
1
I'm wondering if I'm missing something with respect to Python specifics here?
I will reason about it, and not sure if I can get you an answer:
what is likely happening is that in order for multiprocessing.Queue()
to work properly - i.e., act as a Queue endpoint when passed to another process, it has to customize its serialization/de-serialization. And then, what you get there is that the de-serialization of a mp.Queue subclass won't preserve the instance attributes of that class.
(Fact - here is the code for mp.Queue __getstate__
:
File ~/.pyenv/versions/3.12.0/lib/python3.12/multiprocessing/queues.py:58, in Queue.__getstate__(self)
57 def __getstate__(self):
---> 58 context.assert_spawning(self)
59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
60 self._rlock, self._wlock, self._sem, self._opid)
(sorry, I picked it from a stacktrace, hence the noise) - but as you can see, it just serializaes that fixed set of attributes.
I guess the most transparent workaround for your case is, instead of inheritance, use association: your queue class should "have" a multiprocessing.Queue as an attribute, as a sibling to the .size
attribute, and then you have to proxy all Queue methods from your class to the underlying one.