This is an old issue which suggested workaround does not work.
Below is a complete example showing how the suggested approach fails. Uncomment L31 for error.
import multiprocessing
import os
import time
from multiprocessing import get_context
from multiprocessing.queues import Queue
class SharedCounter(object):
def __init__(self, n=0):
self.count = multiprocessing.Value('i', n)
def increment(self, n=1):
with self.count.get_lock():
self.count.value += n
@property
def value(self):
return self.count.value
class MyQueue(Queue):
def __init__(self, *args, **kwargs):
super(MyQueue, self).__init__(*args, ctx=get_context(), **kwargs)
self.size = SharedCounter(0)
def put(self, *args, **kwargs):
self.size.increment(1)
super(MyQueue, self).put(*args, **kwargs)
def get(self, *args, **kwargs):
# self.size.increment(-1) # uncomment this for error
return super(MyQueue, self).get(*args, **kwargs)
def qsize(self):
return self.size.value
def empty(self):
return not self.qsize()
def clear(self):
while not self.empty():
self.get()
def worker(queue):
while True:
item = queue.get()
if item is None:
break
print(f'[{os.getpid()}]: got {item}')
time.sleep(1)
if __name__ == '__main__':
num_processes = 4
q = MyQueue()
pool = multiprocessing.Pool(num_processes, worker, (q,))
for i in range(10):
q.put("hello")
q.put("world")
for i in range(num_processes):
q.put(None)
q.close()
q.join_thread()
pool.close()
pool.join()
For some reason, the newly defined MyQueue
forgets about the size
attribute.
Process SpawnPoolWorker-1:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
initializer(*initargs)
File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
item = queue.get()
^^^^^^^^^^^
File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
self.size.increment(-1) # uncomment this for error
^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
Process SpawnPoolWorker-2:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
initializer(*initargs)
File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
item = queue.get()
^^^^^^^^^^^
File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
self.size.increment(-1) # uncomment this for error
^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
Process SpawnPoolWorker-4:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
initializer(*initargs)
File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
item = queue.get()
^^^^^^^^^^^
File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
self.size.increment(-1) # uncomment this for error
^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
initializer(*initargs)
File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
item = queue.get()
^^^^^^^^^^^
File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
self.size.increment(-1) # uncomment this for error
^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
well, you didn't to override __setstate__
and __getstate__
to include your variable, which are used by pickle to control the serialization Handling Stateful Objects ... so you should override them to add your variable to what's being serialized.
import multiprocessing
import os
import time
from multiprocessing import get_context
from multiprocessing.queues import Queue
class SharedCounter(object):
def __init__(self, n=0):
self.count = multiprocessing.Value('i', n)
def increment(self, n=1):
with self.count.get_lock():
self.count.value += n
@property
def value(self):
return self.count.value
class MyQueue(Queue):
def __init__(self, *args, **kwargs):
super(MyQueue, self).__init__(*args, ctx=get_context(), **kwargs)
self.size = SharedCounter(0)
def __getstate__(self):
return (super(MyQueue, self).__getstate__(),self.size)
def __setstate__(self, state):
super(MyQueue, self).__setstate__(state[0])
self.size = state[1]
def put(self, *args, **kwargs):
self.size.increment(1)
super(MyQueue, self).put(*args, **kwargs)
def get(self, *args, **kwargs):
self.size.increment(-1) # uncomment this for error
return super(MyQueue, self).get(*args, **kwargs)
def qsize(self):
return self.size.value
def empty(self):
return not self.qsize()
def clear(self):
while not self.empty():
self.get()
def worker(queue):
while True:
item = queue.get()
if item is None:
break
print(f'[{os.getpid()}]: got {item}')
time.sleep(1)
if __name__ == '__main__':
num_processes = 4
q = MyQueue()
pool = multiprocessing.Pool(num_processes, initializer=worker, initargs=(q,))
for i in range(10):
q.put("hello")
q.put("world")
for i in range(num_processes):
q.put(None)
q.close()
q.join_thread()
pool.close()
pool.join()
note that in python 3 we don't need to use super(MyQueue, self)
, as super()
would suffice, and will make it easier to rename your class in the future and other portability and refactoring benefits, so consider swapping any super(x,y)
with just super()