I'm relatively new to Python. I'm trying to work with the queue.PriorityQueue()
class
and have a question. Is there a way to check if a particular priority exists?
For eg., I have the following (priority, element)
:
(0,A), (1,B), (2,C), (2,D)
Is there a way I can check if priority 2
exists? (Yes, since C
and D
are both 2.)
Can I get elements for a particular priority? So, if I want elements with priority 2
it gives me C
and D
?
The only official documentation I've seen on the priority queue has been : Priority Queue official documentation
Is there any other documentation on this class? Like methods I can use? Structure of the class/ fields available?
My understanding is that your end goal is to have tasks with the same priority placed on a list and returned together. So:
If you were to look at the code for queue.PriorityQueue
(I did), you would see that it is based on the heappush
and heappop
methods of the heapq
module, which implements the heap queue algorithm. See heapq. If you look a little further on this page, they even show how you can use a heapq to implement a priority queue. This implementation is a bit more complicated then what you need in that it supports the ability to change the priority of an already added task and it doesn't quite handle multiple tasks with the same priority as you would like. But those changes are easy enough to make:
from heapq import heappush, heappop
class PriorityQueue:
def __init__(self):
self._pq = [] # list of entries arranged in a heap
self._priority_finder = {} # mapping of priority to entries
def add_task(self, task, priority=0):
'Add a new task'
# any tasks with this priority?
entry = self._priority_finder.get(priority)
if entry:
entry[1].append(task)
else:
entry = [priority, [task]]
self._priority_finder[priority] = entry
heappush(self._pq, entry)
def pop_task(self):
'Remove and return the lowest priority tasks. Raise KeyError if empty.'
if not self._pq:
raise KeyError('pop from an empty priority queue')
priority, tasks = heappop(self._pq)
del self._priority_finder[priority]
return priority, tasks
def __bool__(self):
'return True if any tasks on the queue'
return True if self._pq else False
pq = PriorityQueue()
pq.add_task('a', 4) # task 'a' with priority 4
pq.add_task('b', 2)
pq.add_task('c', 4)
pq.add_task('d', 2)
pq.add_task('e', 1)
while pq:
print(pq.pop_task())
Prints:
(1, ['e'])
(2, ['b', 'd'])
(4, ['a', 'c'])