Search code examples
pythonpython-2.7python-multiprocessing

How to sort a multiprocess queue based on an attribute of the object


With a regular list I could sort the list based on a objects attribute with:

queue.sort(key=lambda weed: (weed.x_coord), reverse=True)

However, with a multiprocessing queue this was not possible, so how can I accomplish the same sorting with a multiprocessing queue? Or is it preferred to avoid a multiprocess queue if I want to have the queue sorted in the end?

The requirement is that the queue/list should be thread-safe and process safe since the queue/list will be populated by two threads running parallel.

The two processes (p1 and p2) that insert the objects into the shared queue will continue to run alongside a third process (state machine) that reads from the queue (see code below). I.e the state machine process will not wait for the p1 and p2 process to end.

The implementation so far:

import multiprocessing

class Weed():
    x=None
    y=None
    def __init__(self,x,y):
        self.x=x
        self.y=y

def p1(q):
    """
    Function that inserts weed in the shared queue
    """
    # append squares of mylist to queue
    q.put(Weed(10.1,7.3))
    q.put(Weed(8.3,2.8))
    q.put(Weed(5.1,4.2))
    q.put(Weed(15.4,5.0))

def p2(q):
    """
    Function that inserts weed in the shared queue
    """
    # append squares of mylist to queue
    q.put(Weed(25.1,1))
    q.put(Weed(1.3,1))
    q.put(Weed(9.1,1))
    q.put(Weed(13.4,1))


def state_machine(q):
    """
    Function that sorts the queue (w.r.t x-coord.) and prints it out
    """
    print("Queue elements:")
    while not q.empty():
        q.sort(key=lambda x: (x.x), reverse=True) # Gives error - 
        print(q.get().x)
    print("Queue is now empty!")

if __name__ == "__main__":

    # creating multiprocessing Queue
    q = multiprocessing.Queue()

    # creating new processes
    p1 = multiprocessing.Process(target=p1, args=(q,))
    p2 = multiprocessing.Process(target=p2, args=(q,))
    p3 = multiprocessing.Process(target=state_machine, args=(q,))

    # running process p1 to generate some weeds
    p1.start()


    # running process p2 to generate some weeds
    p2.start()


    # running process p3 to sort the weed queue (by x coord.) and print them out
    p3.start()


    p1.join()
    p2.join()
    p3.join()

Solution

  • In your example, the 3 processes do not run concurrently (you start them and join them before starting the next one), I'll assume the real-world case does have concurrency.

    Be careful though: in a real-world case, an empty queue does not mean the other tasks have finished. You'll need another synchronisation mechanism.

    My suggestion to be to fall back to a regular list inside the state_machine function, and transfer elements from the multiprocessing queue to the list as they arrive. You can then sort the list and print elements in order. You won't have concurrency issues as the internal list is only modified by the thread running state_machine.

    def state_machine(q):
        """
        Function that sorts the queue (w.r.t x-coord.) and prints it out
        """
        print("Queue elements:")
        internal_queue = []
        # here, we assume that all tasks have finished working.
        # if it is not the case, you should add a barrier to wait
        while not q.empty():
            internal_queue.append(q.get())
    
        internal_queue.sort(key=lambda item: (item.x), reverse=True)
        for elt in internal_queue:
            print(elt.x)
        print("Queue is now empty!")
    

    Program prints:

    Queue elements:
    25.1
    15.4
    13.4
    10.1
    9.1
    8.3
    5.1
    1.3
    Queue is now empty!
    

    [EDIT]

    In a real-world scenario, you don't want to wait for consumer to finish before starting the printing. However, you'll have to find a compromise between two problems:

    • if you wait too long before starting consuming elements, you basically get back to waiting for the producers to finish.
    • if you consume the elements of your queue too fast (i.e. print them as soon as they arrive), your queue will tend to be empty most of the time, and sorting does not make much sense anymore.

    There are no (IMHO) optimal solution there, here is a proposition, where the internal queue is regularly updated, while giving time for producers to finish their work:

    def state_machine(q):
        """
        Function that sorts the queue (w.r.t x-coord.) and prints it out
        """
        internal_queue = []
        def update_internal_queue():
            while not q.empty():
                internal_queue.append(q.get())
            internal_queue.sort(key=lambda item: (item.x), reverse=True)
    
    
        # wait a bit for the beginning of the elements to arrive
        time.sleep(5)
        update_internal_queue()
        print("Queue elements:")
    
        while internal_queue:
            time.sleep(1) # we can optionally wait a bit before each print
            update_internal_queue() # see if other elements arrived in the meantime
            print(internal_queue.pop(0).x)
        print("Queue is now empty!")