Search code examples
pythonmultiprocessingqueuepython-multiprocessing

multiprocessing hanging at join


Before anyone marks it as a duplicate question. I have been looking at StackOverflow posts for days, I haven't really found a good or satisfying answer.

I have a program that at some point will take individual strings (also many other arguments and objects), do some complicated processes on them, and spit 1 or more strings back. Because each string is processed separately, using multiprocessing seems natural here, especially since I work on machines with over 100 cores.

The following is a minimal example, which works with up to 12 to 15 cores, if I try to give it more cores, it hangs at p.join(). I know it's hanging at join because I tried to add some debug prints before and after join and it would stop at some point between the two print commands.

Minimal example:

import os, random, sys, time, string
import multiprocessing as mp

letters = string.ascii_uppercase
align_len = 1300

def return_string(queue):
    n_strings = [1,2,3,4]
    alignments = []

    # generating 1 to 4 sequences randomly, each sequence of length 1300
    # the original code might even produce more than 4, but 1 to 4 is an average case
    # instead of the random string there will be some complicated function called
    # in the original code
    for i in range(random.choice(n_strings)):
        alignment = ""
        for i in range(align_len):
            alignment += random.choice(letters)
        alignments.append(alignment)

    for a in alignments:
        queue.put(a)


def run_string_gen(cores):
    processes = []
    queue = mp.Queue()
    # running the target function 1000 time
    for i in range(1000):
        # print(i)
        process = mp.Process(target=return_string, args = (queue,))
        processes.append(process)
        if len(processes) == cores:
            counter = len(processes)
            for p in processes:
                p.start()

            for p in processes:
                p.join()

            while queue.qsize() != 0:
                a = queue.get()
                # the original idea is that instead of print
                # I will be writing to a file that is already open
                print(a)

            processes = []
            queue = mp.Queue()

    # any leftovers processes
    if processes:
        for p in processes:
            p.start()
        for p in processes:
            p.join()
        while queue.qsize() != 0:
            a = queue.get()
            print(a)

if __name__ == "__main__":
    cores = int(sys.argv[1])
    if cores > os.cpu_count():
        cores = os.cpu_count()
    start = time.perf_counter()
    run_string_gen(cores)
    print(f"it took {time.perf_counter() - start}")

The suspect is that the queue is getting full, but also it's not that many strings, when I give it 20 cores, it's hanging, but that's about 20*4=80 strings (if the choice was always 4), but is that many strings for the queue to get full?

Assuming the queue is getting full, I am not sure at which point I should check and empty it. Doing it inside return_string seems to be a bad idea as some other processes will also have the queue and might be emptying it/filling it at the same time. Do I use lock.acquire() and lock.release() then? These strings will be added to a file, so I can avoid using a queue and output the strings to a file. However, because starting a process means copying objects, I cannot pass a _io.TextIOWrapper object (which is an open file to append to) but I need to open and close the file inside return_string while syncing using lock.acquire() and lock.release(), but this seems wasteful to keep opening and closing the output file to write to it.

Some of the suggested solutions out there:

1- De-queuing the queue before joining is one of the answers I found. However, I cannot anticipate how long each process will take, and adding a sleep command after p.start() loop and before p.join() is bad (at least for my code), because if they finish fast and I end up waiting, that's just a lot of time wasted, and the whole idea is to have speed here.

2- Add some kind of sentinal character e.g. none to know if one worker finished. But didn't get this part, if I run the target function 10 times for 10 cores, I will have 10 sentinels, but the problems is that it's hanging and can't get to the queue to empty and check for sentinal.

Any suggestions or ideas on what to do here?


Solution

  • Read carefully the documentation for `multiprocessing.Queue. Read the second warning, which says in part:

    Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

    This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

    In simple terms, your program violates this by joining the processes before it has read the items from the queue. You must reverse the order of operations. Then the problem becomes how does the main process know when to stop reading if the subprocesses are still running and writing to the queue. The simplest solution is for each subprocess to write a special sentinel record as the final item signaling that there are no more items that will be written by that process. The main process can then simply do blocking reads until it sees N sentinel records where N is the number of processes that it has started that will be writing to the queue. The sentinel record just has to be any unique record that cannot be mistaken for a normal item to be processed. None will suffice for that purpose:

    import os, random, sys, time, string
    import multiprocessing as mp
    
    letters = string.ascii_uppercase
    align_len = 1300
    
    SENTINEL = None # no more records sentinel
    
    def return_string(queue):
        n_strings = [1,2,3,4]
        alignments = []
    
        # generating 1 to 4 sequences randomly, each sequence of length 1300
        # the original code might even produce more than 4, but 1 to 4 is an average case
        # instead of the random string there will be some complicated function called
        # in the original code
        for i in range(random.choice(n_strings)):
            alignment = ""
            for i in range(align_len):
                alignment += random.choice(letters)
            alignments.append(alignment)
    
        for a in alignments:
            queue.put(a)
        # show this process is through writing records:
        queue.put(SENTINEL)
    
    
    def run_string_gen(cores):
        processes = []
        queue = mp.Queue()
        # running the target function 1000 time
        for i in range(1000):
            # print(i)
            process = mp.Process(target=return_string, args = (queue,))
            processes.append(process)
            if len(processes) == cores:
                counter = len(processes)
                for p in processes:
                    p.start()
    
                seen_sentinel_count = 0
                while seen_sentinel_count < len(processes):
                    a = queue.get()
                    if a is SENTINEL:
                        seen_sentinel_count += 1
                    # the original idea is that instead of print
                    # I will be writing to a file that is already open
                    else:
                        print(a)
    
                for p in processes:
                    p.join()
    
                processes = []
                # The same queue can be reused:
                #queue = mp.Queue()
    
        # any leftovers processes
        if processes:
            for p in processes:
                p.start()
    
            seen_sentinel_count = 0
            while seen_sentinel_count < len(processes):
                a = queue.get()
                if a is SENTINEL:
                    seen_sentinel_count += 1
                else:
                    print(a)
    
            for p in processes:
                p.join()
    
    if __name__ == "__main__":
        cores = int(sys.argv[1])
        if cores > os.cpu_count():
            cores = os.cpu_count()
        start = time.perf_counter()
        run_string_gen(cores)
        print(f"it took {time.perf_counter() - start}")
    

    Prints:

    ...
    NEUNBZVXNHCHVIGNDCEUXJSINEJQNCOWBMUJRTIASUEJHDJUWZIYHHZTJJSJXALZHOEVGMHSVVMMIFZGLGLJDECEWSVZCDRHZWVOMHCDLJVQLQIQCVKBEVOVDWTMFPWIWIQFOGWAOPTJUWKAFBXPWYDIENZTTJNFAEXDVZHXHJPNFDKACCTRTOKMVDGBQYJQMPSQZKDNDYFVBCFMWCSCHTVKURPJDBMRWFQAYIIALHDJTTMSIAJAPLHUAJNMHOKLZNUTRWWYURBTVQHWECAFHQPOZZLVOQJWVLFXUEQYKWEFXQPHKRRHBBCSYZOHUDIFOMBSRNDJNBHDUYMXSMKUOJZUAPPLOFAESZXIETOARQMBRYWNWTSXKBBKWYYKDNLZOCPHDVNLONEGMALL
    it took 32.7125509
    

    Update

    The same code done using a multiprocessing pool, which obviates having to re-create processes:

    import os, random, sys, time, string
    import multiprocessing as mp
    
    letters = string.ascii_uppercase
    align_len = 1300
    
    SENTINEL = None # no more records sentinel
    
    def return_string():
        n_strings = [1,2,3,4]
        alignments = []
    
        # generating 1 to 4 sequences randomly, each sequence of length 1300
        # the original code might even produce more than 4, but 1 to 4 is an average case
        # instead of the random string there will be some complicated function called
        # in the original code
        for i in range(random.choice(n_strings)):
            alignment = ""
            for i in range(align_len):
                alignment += random.choice(letters)
            alignments.append(alignment)
    
        return alignments
    
    
    def run_string_gen(cores):
        def my_callback(result):
            alignments = result
            for alignment in alignments:
                print(alignment)
    
        pool = mp.Pool(cores)
        for i in range(1000):
            pool.apply_async(return_string, callback=my_callback)
        # wait for completion of all tasks:
        pool.close()
        pool.join()
    
    if __name__ == "__main__":
        cores = int(sys.argv[1])
        if cores > os.cpu_count():
            cores = os.cpu_count()
        start = time.perf_counter()
        run_string_gen(cores)
        print(f"it took {time.perf_counter() - start}")
    

    Prints:

    ...
    OMCRIHWCNDKYBZBTXUUYAGCMRBMOVTDOCDYFGRODBWLIFZZBDGEDVAJAJFXWJRFGQXTSCCJLDFKMOENGAGXAKKFSYXEQOICKWFPSKOHIMCRATLVLVLMGFAWBDIJMZMVMHCXMTVJBSWXTLDHEWYHUMSQZGGFWRMOHKKKGMTFEOTTJDOQMOWWLKTOWHKCIUNINHTGUZHTBGHROPVKQBNEHQWIDCZUOJGHUXLLDGHCNWIGFUCAQAZULAEZPIP
    it took 2.1607988999999996