Search code examples
pythonmultiprocessing

How terminates a python process with queues ? + Bug with size of queues?


I'm just starting out with multiprocessing in Python. I use multiprocessing.queues in my program that I pass from process to process. The problem is that when I use myprocess.join(), it only terminates when the queue I'm writing to is empty. So I'm wondering what the reason is, why it's built that way and how to get around it. I can empty all my queues in my main process, but I'd like it better.

Bug part : What I explained to you is the bahaviour for queues with more than 100 elements.

In my example if you put a list of initial data of 100 : in range(100) the p4.join() will be called as expected, but if you put 1000 or more it stops working as I explained. So I'm a little bit lost.

EDIT : The bug appears at a input data size of 533 or 534 elements

Thanks,

Here a small example to show you.



import random
import multiprocessing
import time


def process1(data, queue1):
    for item in data:
        queue1.put(item)

    queue1.put(None)


def process2(queue1, queue2):
    while True:
        item = queue1.get()
        if item is None:
            queue2.put(None)
            break
        queue2.put(item * 2)


def process3(queue2, queue3):
    while True:
        item = queue2.get()
        if item is None:
            queue3.put(None)
            break
        queue3.put(item * 2)


def process4(queue3, queue4):
    while True:
        item = queue3.get()
        if item is None:
            queue4.put(None)
            print("None received")
            break
        queue4.put(item * 2)
    print("End process4")


if __name__ == "__main__":
    start_time = time.time()

    data = [random.randint(1, 10000) for _ in range(10000)]

    queue1 = multiprocessing.Queue()
    queue2 = multiprocessing.Queue()
    queue3 = multiprocessing.Queue()
    queue4 = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=process1, args=(data, queue1))
    p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
    p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
    p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print("Proc started")

    p1.join()
    print("p1 join")
    p2.join()
    print("p2 join")
    p3.join()
    print("p3 join")
    p4.join()
    print("p4 join")

    results = []
    print("While loop")
    while True:
        item = queue4.get()
        if item is None:
            break
        results.append(item)

    end_time = time.time()

    print(end_time - start_time)




Solution

  • The issue is described in https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming in section "Joining processes that use queues".

    The basic idea is the process will wait before terminating until all the buffered items are fed by the "feeder" thread to the underlying pipe.

    import multiprocessing
    import random
    import time
    
    
    def process1(data, queue1):
        for item in data:
            queue1.put(item)
    
        queue1.put(None)
    
    
    def process2(queue1, queue2):
        while True:
            item = queue1.get()
            if item is None:
                queue2.put(None)
                break
            queue2.put(item * 2)
    
    
    def process3(queue2, queue3):
        while True:
            item = queue2.get()
            if item is None:
                queue3.put(None)
                break
            queue3.put(item * 2)
    
    
    def process4(queue3, queue4):
        # calling queue4.cancel_join_thread() will correctly terminate the process
        # queue4.cancel_join_thread()
        # BUT not all items are put into the queue4 (rest are discarded)
    
        # see https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
        # section "Joining processes that use queues"
    
        while True:
            item = queue3.get()
            if item is None:
                queue4.put(None)
                print("None received")
                break
            queue4.put(item * 2)
        print("End process4")
        print(queue4.qsize())
    
    
    if __name__ == "__main__":
        start_time = time.time()
    
        data = [random.randint(1, 10000) for _ in range(10000)]
    
        queue1 = multiprocessing.Queue()
        queue2 = multiprocessing.Queue()
        queue3 = multiprocessing.Queue()
        queue4 = multiprocessing.Queue()
    
        p1 = multiprocessing.Process(target=process1, args=(data, queue1))
        p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
        p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
        p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
        print("Proc started")
    
        p1.join()
        print("p1 join")
        p2.join()
        print("p2 join")
        p3.join()
        print("p3 join")
    
        # don't call p4.join() here
        # p4.join()
        # print("p4 join")
    
        results = []
        print("While loop")
        while True:
            item = queue4.get()
            if item is None:
                break
            results.append(item)
    
        end_time = time.time()
    
        # call p4.join() here instead:
        p4.join()
        print("p4 join")
    
        print(end_time - start_time)
    

    Prints:

    Proc started
    p1 join
    None received
    End process4
    10001
    p2 join
    p3 join
    While loop
    p4 join
    0.1303730010986328