Search code examples
pythonpython-3.xpriority-queue

Is there a way to check if a priority exists in a priority queue in Python?


I'm relatively new to Python. I'm trying to work with the queue.PriorityQueue() class and have a question. Is there a way to check if a particular priority exists?

For eg., I have the following (priority, element):

(0,A), (1,B), (2,C), (2,D)

Is there a way I can check if priority 2 exists? (Yes, since C and D are both 2.) Can I get elements for a particular priority? So, if I want elements with priority 2 it gives me C and D?

The only official documentation I've seen on the priority queue has been : Priority Queue official documentation

Is there any other documentation on this class? Like methods I can use? Structure of the class/ fields available?


Solution

  • My understanding is that your end goal is to have tasks with the same priority placed on a list and returned together. So:

    If you were to look at the code for queue.PriorityQueue (I did), you would see that it is based on the heappush and heappop methods of the heapq module, which implements the heap queue algorithm. See heapq. If you look a little further on this page, they even show how you can use a heapq to implement a priority queue. This implementation is a bit more complicated then what you need in that it supports the ability to change the priority of an already added task and it doesn't quite handle multiple tasks with the same priority as you would like. But those changes are easy enough to make:

    from heapq import heappush, heappop
    
    class PriorityQueue:
        def __init__(self):
            self._pq = [] # list of entries arranged in a heap
            self._priority_finder = {} # mapping of priority to entries
    
        def add_task(self, task, priority=0):
            'Add a new task'
            # any tasks with this priority?
            entry = self._priority_finder.get(priority)
            if entry:
                entry[1].append(task)
            else:
                entry = [priority, [task]]
                self._priority_finder[priority] = entry
                heappush(self._pq, entry)
    
        def pop_task(self):
            'Remove and return the lowest priority tasks. Raise KeyError if empty.'
            if not self._pq:
                raise KeyError('pop from an empty priority queue')
            priority, tasks = heappop(self._pq)
            del self._priority_finder[priority]
            return priority, tasks
    
        def __bool__(self):
            'return True if any tasks on the queue'
            return True if self._pq else False
    
    pq = PriorityQueue()
    pq.add_task('a', 4) # task 'a' with priority 4
    pq.add_task('b', 2)
    pq.add_task('c', 4)
    pq.add_task('d', 2)
    pq.add_task('e', 1)
    while pq:
        print(pq.pop_task())
    

    Prints:

    (1, ['e'])
    (2, ['b', 'd'])
    (4, ['a', 'c'])