Search code examples
pythonqueueipcmultiprocess

How to implement multiprocessing IPC with multiple queues?


I'm new to multiprocessing and multithreading. For learning purpose, i'm trying to achieve IPC using queues.

Code


from multiprocessing import Process, Queue, Lock

import math



def calculate_square(sq_q, sqrt_q):
    itm = sq_q.get()
    print(f"Calculating sq of: {itm}")
    square = itm * itm
    sqrt_q.put(square)

def calculate_sqroot(sqrt_q, result_q):
    itm = sqrt_q.get()
    print(f"Calculating sqrt of: {itm}")
    sqrt = math.sqrt(itm)
    result_q.put(sqrt)



sq_q = Queue()
sqrt_q = Queue()
result_q = Queue()


for i in range(5, 20):
    sq_q.put(i)


p_sq = Process(target=calculate_square, args=(sq_q, sqrt_q))
p_sqrt = Process(target=calculate_sqroot, args=(sqrt_q, result_q))


p_sq.start()
p_sqrt.start()



p_sq.join()
p_sqrt.join()

while not result_q.empty():
    print(result_q.get())

Explanation

Here i'm trying to run two of the functions with two different processes each of which calculates the square of the number and again calculate the square root of the number.

Queues

  • sq_q: Queue containing the initial number whose square root is to calculated.
  • sqrt_q: Queue containing the numbers whose square root has to be calculated
  • result_q: Queue containing final result.

Issue

Only the first item of the sq_q is consumed.

Output:

5.0

I expect the output to be: [5, 6, 7, 8, .. , 19]

Please note that this is purely for learning purpose and i want to implement the IPC with multiple queues though it could have been possible to achieve with shared objects Locks and arrays.


Solution

  • You only call the functions once, so only the first value 5 is taken, you then want to loop all the values in the queue.

    while not sq_q.empty():
        itm = sq_q.get()
        print(f"Calculating sq of: {itm}")
        square = itm * itm
        sqrt_q.put(square)
    

    Same goes for the other function, but the condition here will be till result_q is full (give a max size to the result queue for the condition to work), then the final result will have the values.

        while not result_q.full():
            itm = sqrt_q.get()
            print(f"Calculating sqrt of: {itm}")
            sqrt = math.sqrt(itm)
            result_q.put(sqrt)
    

    Complete Code

    import math
    from multiprocessing import Process, Queue
    
    
    def calculate_square(sq_q, sqrt_q):
        while not sq_q.empty():
            itm = sq_q.get()
            print(f"Calculating sq of: {itm}")
            square = itm * itm
            sqrt_q.put(square)
    
    
    def calculate_sqroot(sqrt_q, result_q):
        while not result_q.full():
            itm = sqrt_q.get()
            print(f"Calculating sqrt of: {itm}")
            sqrt = math.sqrt(itm)
            result_q.put(sqrt)
    
    
    if __name__ == "__main__":
        sq_q = Queue()
        sqrt_q = Queue()
        result_q = Queue(5)
    
        for i in range(5):
            sq_q.put(i)
    
        p_sq = Process(target=calculate_square, args=(sq_q, sqrt_q))
        p_sqrt = Process(target=calculate_sqroot, args=(sqrt_q, result_q))
    
        p_sq.start()
        p_sqrt.start()
    
        p_sq.join()
        p_sqrt.join()
    
        while not result_q.empty():
            print(result_q.get())
    
    

    Output

    Calculating sq of: 0
    Calculating sq of: 1
    Calculating sq of: 2
    Calculating sq of: 3
    Calculating sq of: 4
    Calculating sqrt of: 0
    Calculating sqrt of: 1
    Calculating sqrt of: 4
    Calculating sqrt of: 9
    Calculating sqrt of: 16
    0.0
    1.0
    2.0
    3.0
    4.0
    

    Edit:

    As the calculate_sqroot now depends on result_q a delay is no longer needed.