Search code examples
pythonmultiprocessingqueue

Overriding multiprocessing.queues.Queue put method


I want to implement a multiprocessing.Queue that does not add an element that already exists. Using Python STL Queue I had no problem, following this response. For multiprocessing I had some issues that I solved thanks to this For that I do the following:

from multiprocessing.queues import Queue
from multiprocessing import get_context


class CustomQueue(Queue):
    def put(self, obj, block=True, timeout=None):
        if obj not in self:
            return super().put(obj, block, timeout)

    def __contains__(self, item):
        with self.mutex:
            return item in self.queue

custom_queue = CustomQueue(ctx=get_context())

However, when I call the put method I get AttributeError: 'CustomQueue' object has no attribute 'mutex'

How can I solve this issue? Thank you in advance.


I read the code of multiprocessing.queues.Queue, and did my change to this:

class CustomQueue(Queue):
    def put(self, obj, block=True, timeout=None):
        if self._closed:
            raise ValueError(f"Queue {self!r} is closed")
        if not self._sem.acquire(block, timeout):
            raise Full

        with self._notempty:
            if self._thread is None:
                self._start_thread()
            if obj not in self._buffer:
                self._buffer.append(obj)
                self._notempty.notify()

But still does not work. self._buffer seems to be the queue (is a collections.deque object). But the obj not in self._buffer returns always True. Why is this happening?


Solution

  • Rather than assuming how a queue.Queue is implemented, which could change in the future, I would prefer the following. To keep track of what has been put on the queue but not yet gotten in such a way where no assumption of how the queue is implemented, we need a separate set or dict that gets updated every time a get or put call is made and this set must be shareable across processes. For this we can used a managed dictionary (which, unfortunately, will slow down get and put operations -- but no more than your implementation that is based on a managed queue):

    from multiprocessing import Queue, Lock, Manager, Process
    
    class CustomQueue:
        def __init__(self, syncmanager, maxsize=0):
            self._queue = Queue(maxsize)
            self._lock = Lock()
            self._items = syncmanager.dict()
    
        def put(self, obj, block=True, timeout=None):
            with self._lock:
                if obj not in self._items:
                    self._items[obj] = None
                    self._queue.put(obj, block=block, timeout=timeout)
    
        def get(self, block=True, timeout=None):
            obj = self._queue.get(block=block, timeout=timeout)
            # No exception raised so we retrieved an item:
            with self._lock:
                if obj in self._items:
                    del self._items[obj]  # remove
            return obj
    
        def __getattr__(self, name):
            """Call all other methods of Queue."""
            if name == '__getstate__':
                return object.__getstate__
    
            if name == '__setstate__':
                return object.__setstate__
    
            return getattr(self._queue, name)
    
    def putter1(q):
        q.put(1)
        q.put(2)
    
    def putter2(q):
        q.put(1)
        q.put(3)
    
    def main():
        with Manager() as manager:
            q = CustomQueue(manager)
    
            p1 = Process(target=putter1, args=(q,))
            p1.start()
    
            p2 = Process(target=putter2, args=(q,))
            p2.start()
    
            # Give subprocesses a chance to put their items:
            import time
            time.sleep(1)
    
            for _ in range(3):
                print(q.get())
    
            p1.join()
            p2.join()
    
    if __name__ == '__main__':
        main()
    

    Prints:

    1
    2
    3
    

    If you are not concerned about how a queue.Queue implementation might change in the future, I would still make changes to your implementation in which I see a race condition. Your __contains__ method checks to see if the item being put is already in the queue using a lock, but once it returns True or False back to the caller put, it has released the lock and there is a window in which another process might have already put a duplicate item on the queue prior to put putting the same item again. Therefore, the testing for the presence of the item on the queue and the actual placing of the item on the queue constitute a critical section and both operations must be done under control of a lock:

    from queue import Queue
    from threading import Lock
    from multiprocessing.managers import BaseManager
    from multiprocessing import Process
    
    
    class MyManager(BaseManager):
        pass
    
    
    class NoRepetitionQueue(Queue):
        def __init__(self, maxsize=0):
            super().__init__(maxsize)
            self._lock = Lock()
    
        def put(self, obj, block=True, timeout=None):
            with self._lock:
                if obj not in self.queue:
                    return super().put(obj, block=block, timeout=timeout)
    
    
    MyManager.register("NoRepetitionQueue", NoRepetitionQueue)
    
    def putter1(q):
        q.put(1)
        q.put(2)
    
    def putter2(q):
        q.put(1)
        q.put(3)
    
    def main():
        with MyManager() as manager:
            q = manager.NoRepetitionQueue()
    
            p1 = Process(target=putter1, args=(q,))
            p1.start()
    
            p2 = Process(target=putter2, args=(q,))
            p2.start()
    
            # Give subprocesses a chance to put their items:
            import time
            time.sleep(1)
    
            for _ in range(3):
                print(q.get())
    
            p1.join()
            p2.join()
    
    if __name__ == '__main__':
        main()