Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessing

multiprocessing value hangs with lock


I've read the documentation here, and seems that to make sure that the Value does not hang we need to use a lock. I did just that but it still gets stuck:

from multiprocessing import Process, Value, freeze_support, Lock
nb_threads = 3
nbloops = 10
v = Value('i', 0)

def run_process(lock):
    global nbloops
    i = 0
    while i < nbloops:
        # do stuff
        i += 1
        with lock:
            v.value += 1
        # wait for all the processes to finish doing something
        while v.value % nb_threads != 0:
            pass

if __name__ == '__main__':
    freeze_support()
    processes = []
    lock = Lock()

    for i in range(0, 3):
        processes.append( Process( target=run_process, args=(lock,) ) )
    for process in processes:
        process.start()
    for process in processes:
        process.join()

I've tried accessing the value using lock but it still blocks:

        val = -1
        while val % nb_threads != 0:
            with lock:
                val = v.value

How can I fix this? Thanks


Solution

  • Your code has a race condition; you do not guarantee that all three processes break free from the while v.value % nb_threads != 0 loop before allowing them to move on. This allows one or two of the processes to move on to the next iteration of the while i < nbloops loop, increment v.value, and then prevent the remaining process/processes from ever breaking out of their own while v.value % nb_threads != 0 loop. The kind of synchronization you're trying to do there is best handled by a Barrier, rather than looping and repeatedly checking the value.

    Also, multiprocessing.Value also has a built-in synchronization by default, and you can explicitly access the Lock it uses for that by calling Value.get_lock, so there is no need to explicitly a Lock of your own to each process. Putting together, you have:

    from multiprocessing import Process, Value, freeze_support, Lock, Barrier
    nb_threads = 3
    nbloops = 10
    v = Value('i', 0)
    
    def run_process(barrier):
        global nbloops
        i = 0
        while i < nbloops:
            # do stuff
            i += 1
            with v.get_lock():
                v.value += 1
            # wait for all the processes to finish doing something
            out = barrier.wait()
    
    if __name__ == '__main__':
        freeze_support()
        processes = []
        b = Barrier(nb_threads)
    
        for i in range(0, nb_threads):
            processes.append( Process( target=run_process, args=(b,) ) )
        for process in processes:
            process.start()
        for process in processes:
            process.join()
    

    The Barrier guarantees that no process can move on to the next iteration of the loop until all of them have called Barrier.wait(), at which point all three are simultaneously able to progress. The Barrier object supports re-use, so it can safely be called on each iteration.