Search code examples
pythonqueuepython-asynciopriority-queue

How to push items off of asyncio.PriorityQueue when it is at maxsize and I put() new items?


I have an asyncio.PriorityQueue that I am using as the URL queue for a web crawler, with the lowest scored URLs being the first removed from the queue when I call url_queue.get(). When the queue reaches maxsize items, the default behavior is to block on calls to url_queue.put(), until a call to get() removes an item from the queue to make space.

What I would like to do is to never block, but instead push off the queue item with the highest score (or least an item with one of the highest scores), whenever I attempt to put() an item that has a lower score. Is there a way to automatically remove items from the bottom of the heap this way in asyncio.PriorityQueue? If not, is there an alternative priority queue / heap implementation that works with asyncio, which would enable me to do this? Or some other data structure / technique that would enable me to have some kind of non-blocking, prioritized queue with a maximum size?

Thanks!


Solution

  • Is there a way to automatically remove items from the bottom of the heap this way in asyncio.PriorityQueue?

    Not by default, but it should be straightforward to inherit from asyncio.PriorityQueue and just implement the desired behavior. Unlike multi-threaded queue implementations, the asyncio queue runs in a single thread and therefore does not need to worry about synchronization issues.

    A possible issue with performance is that PriorityQueue is not designed as a double-ended queue, so it uses a heap to store items. A heap is either min or max, but not both; Python's heapq module implements a min-heap, but you can easily simulate a max-heap by multiplying priorities by -1. In a min-heap one can access and pop the smallest item in logarithmic time, but not the largest one, and in a max-heap it's the other way around. To efficiently manipulate both the smallest and the largest item, you'll need to inherit from asyncio.Queue and use a different data structure to store items, such as a sorted list.

    For example (untested):

    class DroppingPriorityQueue(asyncio.Queue):
        def _init(self, maxsize):
            # called by asyncio.Queue.__init__
            self._queue = sortedcontainers.SortedList()
    
        def _put(self, item):
            # called by asyncio.Queue.put_nowait
            self._queue.add(item)
    
        def _get(self):
            # called by asyncio.Queue.get_nowait
            # pop the first (most important) item off the queue
            return self._queue.pop(0)
    
        def __drop(self):
            # drop the last (least important) item from the queue
            self._queue.pop()
            # no consumer will get a chance to process this item, so
            # we must decrement the unfinished count ourselves
            self.task_done()
    
        def put_nowait(self, item):
            if self.full():
                self.__drop()
            super().put_nowait(item)
    
        async def put(self, item):
            # Queue.put blocks when full, so we must override it.
            # Since our put_nowait never raises QueueFull, we can just
            # call it directly
            self.put_nowait(item)
    

    The class implements two distinct concerns:

    • It overrides the _get, _put, and _init protected methods to use a SortedList as the underlying storage. Although undocumented, these methods are used for building customized queues such as PriorityQueue and LifoQueue and have been in place for decades, first in the Queue module (queue in Python 3) and later in asyncio.queue.
    • It overrides the put and put_nowait public methods to implement the drop-when-full semantics.