Search code examples
pythonmacosmultiprocessingqueue

How to get `multiprocessing.queues.Queue.qsize()` on macOS?


This is an old issue which suggested workaround does not work.

Below is a complete example showing how the suggested approach fails. Uncomment L31 for error.

import multiprocessing
import os
import time
from multiprocessing import get_context
from multiprocessing.queues import Queue


class SharedCounter(object):
    def __init__(self, n=0):
        self.count = multiprocessing.Value('i', n)

    def increment(self, n=1):
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        return self.count.value


class MyQueue(Queue):
    def __init__(self, *args, **kwargs):
        super(MyQueue, self).__init__(*args, ctx=get_context(), **kwargs)
        self.size = SharedCounter(0)

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super(MyQueue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
        # self.size.increment(-1)  # uncomment this for error
        return super(MyQueue, self).get(*args, **kwargs)

    def qsize(self):
        return self.size.value

    def empty(self):
        return not self.qsize()

    def clear(self):
        while not self.empty():
            self.get()


def worker(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'[{os.getpid()}]: got {item}')
        time.sleep(1)


if __name__ == '__main__':
    num_processes = 4
    q = MyQueue()
    pool = multiprocessing.Pool(num_processes, worker, (q,))

    for i in range(10):
        q.put("hello")
        q.put("world")

    for i in range(num_processes):
        q.put(None)

    q.close()
    q.join_thread()
    pool.close()
    pool.join()

For some reason, the newly defined MyQueue forgets about the size attribute.

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
Process SpawnPoolWorker-2:
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
Process SpawnPoolWorker-4:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'

Solution

  • well, you didn't to override __setstate__ and __getstate__ to include your variable, which are used by pickle to control the serialization Handling Stateful Objects ... so you should override them to add your variable to what's being serialized.

    import multiprocessing
    import os
    import time
    from multiprocessing import get_context
    from multiprocessing.queues import Queue
    
    class SharedCounter(object):
        def __init__(self, n=0):
            self.count = multiprocessing.Value('i', n)
    
        def increment(self, n=1):
            with self.count.get_lock():
                self.count.value += n
    
        @property
        def value(self):
            return self.count.value
    
    class MyQueue(Queue):
        def __init__(self, *args, **kwargs):
            super(MyQueue, self).__init__(*args, ctx=get_context(), **kwargs)
            self.size = SharedCounter(0)
    
        def __getstate__(self):
            return (super(MyQueue, self).__getstate__(),self.size)
        def __setstate__(self, state):
            super(MyQueue, self).__setstate__(state[0])
            self.size = state[1]
        
        def put(self, *args, **kwargs):
            self.size.increment(1)
            super(MyQueue, self).put(*args, **kwargs)
    
        def get(self, *args, **kwargs):
            self.size.increment(-1)  # uncomment this for error
            return super(MyQueue, self).get(*args, **kwargs)
    
        def qsize(self):
            return self.size.value
    
        def empty(self):
            return not self.qsize()
    
        def clear(self):
            while not self.empty():
                self.get()
    
    def worker(queue):
        while True:
            item = queue.get()
            if item is None:
                break
            print(f'[{os.getpid()}]: got {item}')
            time.sleep(1)
    
    if __name__ == '__main__':
        num_processes = 4
        q = MyQueue()
        pool = multiprocessing.Pool(num_processes, initializer=worker, initargs=(q,))
    
        for i in range(10):
            q.put("hello")
            q.put("world")
    
        for i in range(num_processes):
            q.put(None)
    
        q.close()
        q.join_thread()
        pool.close()
        pool.join()
    

    note that in python 3 we don't need to use super(MyQueue, self), as super() would suffice, and will make it easier to rename your class in the future and other portability and refactoring benefits, so consider swapping any super(x,y) with just super()