With a regular list I could sort the list based on a objects attribute with:
queue.sort(key=lambda weed: (weed.x_coord), reverse=True)
However, with a multiprocessing queue this was not possible, so how can I accomplish the same sorting with a multiprocessing queue? Or is it preferred to avoid a multiprocess queue if I want to have the queue sorted in the end?
The requirement is that the queue/list should be thread-safe and process safe since the queue/list will be populated by two threads running parallel.
The two processes (p1 and p2) that insert the objects into the shared queue will continue to run alongside a third process (state machine) that reads from the queue (see code below). I.e the state machine process will not wait for the p1 and p2 process to end.
The implementation so far:
import multiprocessing
class Weed():
x=None
y=None
def __init__(self,x,y):
self.x=x
self.y=y
def p1(q):
"""
Function that inserts weed in the shared queue
"""
# append squares of mylist to queue
q.put(Weed(10.1,7.3))
q.put(Weed(8.3,2.8))
q.put(Weed(5.1,4.2))
q.put(Weed(15.4,5.0))
def p2(q):
"""
Function that inserts weed in the shared queue
"""
# append squares of mylist to queue
q.put(Weed(25.1,1))
q.put(Weed(1.3,1))
q.put(Weed(9.1,1))
q.put(Weed(13.4,1))
def state_machine(q):
"""
Function that sorts the queue (w.r.t x-coord.) and prints it out
"""
print("Queue elements:")
while not q.empty():
q.sort(key=lambda x: (x.x), reverse=True) # Gives error -
print(q.get().x)
print("Queue is now empty!")
if __name__ == "__main__":
# creating multiprocessing Queue
q = multiprocessing.Queue()
# creating new processes
p1 = multiprocessing.Process(target=p1, args=(q,))
p2 = multiprocessing.Process(target=p2, args=(q,))
p3 = multiprocessing.Process(target=state_machine, args=(q,))
# running process p1 to generate some weeds
p1.start()
# running process p2 to generate some weeds
p2.start()
# running process p3 to sort the weed queue (by x coord.) and print them out
p3.start()
p1.join()
p2.join()
p3.join()
In your example, the 3 processes do not run concurrently (you start them and join them before starting the next one), I'll assume the real-world case does have concurrency.
Be careful though: in a real-world case, an empty queue does not mean the other tasks have finished. You'll need another synchronisation mechanism.
My suggestion to be to fall back to a regular list inside the state_machine
function, and transfer elements from the multiprocessing queue to the list as they arrive. You can then sort the list and print elements in order. You won't have concurrency issues as the internal list is only modified by the thread running state_machine
.
def state_machine(q):
"""
Function that sorts the queue (w.r.t x-coord.) and prints it out
"""
print("Queue elements:")
internal_queue = []
# here, we assume that all tasks have finished working.
# if it is not the case, you should add a barrier to wait
while not q.empty():
internal_queue.append(q.get())
internal_queue.sort(key=lambda item: (item.x), reverse=True)
for elt in internal_queue:
print(elt.x)
print("Queue is now empty!")
Program prints:
Queue elements:
25.1
15.4
13.4
10.1
9.1
8.3
5.1
1.3
Queue is now empty!
[EDIT]
In a real-world scenario, you don't want to wait for consumer to finish before starting the printing. However, you'll have to find a compromise between two problems:
There are no (IMHO) optimal solution there, here is a proposition, where the internal queue is regularly updated, while giving time for producers to finish their work:
def state_machine(q):
"""
Function that sorts the queue (w.r.t x-coord.) and prints it out
"""
internal_queue = []
def update_internal_queue():
while not q.empty():
internal_queue.append(q.get())
internal_queue.sort(key=lambda item: (item.x), reverse=True)
# wait a bit for the beginning of the elements to arrive
time.sleep(5)
update_internal_queue()
print("Queue elements:")
while internal_queue:
time.sleep(1) # we can optionally wait a bit before each print
update_internal_queue() # see if other elements arrived in the meantime
print(internal_queue.pop(0).x)
print("Queue is now empty!")