I wanted to implement multiprocessing priorityqueue . I found this answer :- Strange Queue.PriorityQueue behaviour with multiprocessing in Python 2.7.6
by Dano
After I implemented this . I could use .get() and .put() function for my Priority Queue but when i used .queue to print the current elements in the queue it gave me an error
code:-
class MyManager(SyncManager):
pass
def get_manager():
MyManager.register("PriorityQueue", PriorityQueue) # Register a shared PriorityQueue
m = MyManager()
m.start()
return m
m = get_manager()
call= m.PriorityQueue()
for i in range(5):
call.put(i)
print(call.queue)
Error : AttributeError: 'AutoProxy[PriorityQueue]' object has no attribute 'queue'
I read the python documentation for SyncManager and modified my code .
New code :-
class MyManager(SyncManager):
pass
def get_manager():
MyManager.register("PriorityQueue", PriorityQueue,exposed=['put','get','queue']) # Register a shared PriorityQueue
m = MyManager()
m.start()
return m
m = get_manager()
call= m.PriorityQueue()
for i in range(5):
call.put(i)
print(call.queue)
Now output was :-
<bound method AutoProxy[PriorityQueue].queue of <AutoProxy[PriorityQueue] object, typeid 'PriorityQueue' at 0x7ff3b48f2dd0>>
I am still not getting the elements in the queue , i read about method_to_typeid
attribute of register function to map the return type of functions mentioned in exposed
, but i don't know how use that .
Can someone help me with this , so that i could print elements of the queue without poping them from queue
You can only use methods of a referent through a proxy. Since PriorityQueue().queue
is not a method, but an instance attribute, you need to provide a method which can return the value of this attribute.
The example below opts for a generalized get_attribute
method with subclassing PriorityQueue
.
# Python 3.7.1
from queue import PriorityQueue
from multiprocessing.managers import SyncManager
from multiprocessing import Process
SENTINEL = None
class MyPriorityQueue(PriorityQueue):
def get_attribute(self, name):
return getattr(self, name)
class MyManager(SyncManager):
pass
def get_manager():
MyManager.register("PriorityQueue", MyPriorityQueue)
m = MyManager()
m.start()
return m
def f(q):
for item in iter(lambda: q.get()[1], SENTINEL):
print(item)
print(f'queue: {q.get_attribute("queue")}')
if __name__ == '__main__':
m = get_manager()
pq = m.PriorityQueue()
tasks = enumerate([f'item_{i}' for i in range(5)] + [SENTINEL])
for task in tasks:
pq.put(task)
print(f'queue: {pq.get_attribute("queue")}')
print(f'maxsize: {pq.get_attribute("maxsize")}')
p = Process(target=f, args=(pq,))
p.start()
p.join()
Example Output:
queue: [(0, 'item_0'), (1, 'item_1'), (2, 'item_2'), (3, 'item_3'), (4, 'item_4'), (5, None)]
maxsize: 0
item_0
item_1
item_2
item_3
item_4
queue: []