Search code examples
pythonmultiprocessingsemaphore

Issue w Semaphore in Python Multiprocessing


I'm trying to implement a Semaphore that limits the concurrent processes for a task that I'm trying to carry out. However, I'm not getting the behaviour that I expect, and even worse, it is not consistent.

import multiprocessing
import time

def test_print():
    Sema.acquire()
    print("starting!")
    time.sleep(5)
    print("ending!")
    Sema.release()


concurrency = 3
Sema = multiprocessing.Semaphore(concurrency)
processList = []

if __name__ = "___main__":
    for _ in range(10):
        p = multiprocessing.Process(target = test_print)
        p.start()
        processList.append(p)

I expect

starting!
starting!
starting!
ending!
ending!
ending!
starting!
starting!
starting!
ending!
ending!
ending!
starting!
starting!
starting!
ending!
ending!
ending!
starting!
ending!

but I get on my Linux server

starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!


and on my Macbook:

starting!
starting!
starting!
starting!
starting!
starting!
starting!
starting!
starting!
starting!
ending!
ending!
ending!
ending!
ending!
ending!
ending!
ending!
ending!
ending!

Any help is appreciated, in particular if I am having some conceptual misunderstanding of what the Semaphore is doing (I'm a newbie to multiprocessing). Thanks a lot!


Solution

  • First, I don't know how your posted code can run at all. You have:

    if __name__ = "___main__": # two errors here: = -> ==, "___main__" -> "__main__"
    

    i.e. this should be:

    if __name__ == "__main__":
    

    When you are running on a platform that uses the spawn method to start child processes, each child process will be executing everything defined at global scope unless it is within the above if __name__ == "__main__": block. That means that each process will be creating and using its own semaphore. Clearly, this is a problem. You need to create the semaphore in the main process and pass it as an argument to your "worker" function, test_print:

    import multiprocessing
    import time
    
    def test_print(semaphore):
        with semaphore:
            print("starting!")
            time.sleep(5)
            print("ending!")
    
    
    if __name__ ==  "__main__":
        concurrency = 3
        semaphore = multiprocessing.Semaphore(concurrency)
        processList = []
        for _ in range(10):
            p = multiprocessing.Process(target=test_print, args=(semaphore,))
            p.start()
            processList.append(p)
        for p in processList:
            p.join()
    

    Prints:

    starting!
    starting!
    starting!
    ending!
    starting!
    ending!
    starting!
    ending!
    starting!
    ending!
    ending!
    ending!
    starting!
    starting!
    starting!
    ending!
    ending!
    ending!
    starting!
    ending!
    

    Update

    I only just now saw the comment posted by Charchit Agarwal, which is exactly what I have said.