I have an asyncio.PriorityQueue
that I am using as the URL queue for a web crawler, with the lowest scored URLs being the first removed from the queue when I call url_queue.get()
. When the queue reaches maxsize
items, the default behavior is to block on calls to url_queue.put()
, until a call to get()
removes an item from the queue to make space.
What I would like to do is to never block, but instead push off the queue item with the highest score (or least an item with one of the highest scores), whenever I attempt to put()
an item that has a lower score. Is there a way to automatically remove items from the bottom of the heap this way in asyncio.PriorityQueue
? If not, is there an alternative priority queue / heap implementation that works with asyncio, which would enable me to do this? Or some other data structure / technique that would enable me to have some kind of non-blocking, prioritized queue with a maximum size?
Thanks!
Is there a way to automatically remove items from the bottom of the heap this way in
asyncio.PriorityQueue
?
Not by default, but it should be straightforward to inherit from asyncio.PriorityQueue
and just implement the desired behavior. Unlike multi-threaded queue implementations, the asyncio queue runs in a single thread and therefore does not need to worry about synchronization issues.
A possible issue with performance is that PriorityQueue
is not designed as a double-ended queue, so it uses a heap to store items. A heap is either min or max, but not both; Python's heapq
module implements a min-heap, but you can easily simulate a max-heap by multiplying priorities by -1. In a min-heap one can access and pop the smallest item in logarithmic time, but not the largest one, and in a max-heap it's the other way around. To efficiently manipulate both the smallest and the largest item, you'll need to inherit from asyncio.Queue
and use a different data structure to store items, such as a sorted list.
For example (untested):
class DroppingPriorityQueue(asyncio.Queue):
def _init(self, maxsize):
# called by asyncio.Queue.__init__
self._queue = sortedcontainers.SortedList()
def _put(self, item):
# called by asyncio.Queue.put_nowait
self._queue.add(item)
def _get(self):
# called by asyncio.Queue.get_nowait
# pop the first (most important) item off the queue
return self._queue.pop(0)
def __drop(self):
# drop the last (least important) item from the queue
self._queue.pop()
# no consumer will get a chance to process this item, so
# we must decrement the unfinished count ourselves
self.task_done()
def put_nowait(self, item):
if self.full():
self.__drop()
super().put_nowait(item)
async def put(self, item):
# Queue.put blocks when full, so we must override it.
# Since our put_nowait never raises QueueFull, we can just
# call it directly
self.put_nowait(item)
The class implements two distinct concerns:
_get
, _put
, and _init
protected methods to use a SortedList
as the underlying storage. Although undocumented, these methods are used for building customized queues such as PriorityQueue
and LifoQueue
and have been in place for decades, first in the Queue
module (queue
in Python 3) and later in asyncio.queue
.put
and put_nowait
public methods to implement the drop-when-full semantics.