I'm trying to implement a Semaphore that limits the concurrent processes for a task that I'm trying to carry out. However, I'm not getting the behaviour that I expect, and even worse, it is not consistent.
import multiprocessing
import time
def test_print():
Sema.acquire()
print("starting!")
time.sleep(5)
print("ending!")
Sema.release()
concurrency = 3
Sema = multiprocessing.Semaphore(concurrency)
processList = []
if __name__ = "___main__":
for _ in range(10):
p = multiprocessing.Process(target = test_print)
p.start()
processList.append(p)
I expect
starting!
starting!
starting!
ending!
ending!
ending!
starting!
starting!
starting!
ending!
ending!
ending!
starting!
starting!
starting!
ending!
ending!
ending!
starting!
ending!
but I get on my Linux server
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
and on my Macbook:
starting!
starting!
starting!
starting!
starting!
starting!
starting!
starting!
starting!
starting!
ending!
ending!
ending!
ending!
ending!
ending!
ending!
ending!
ending!
ending!
Any help is appreciated, in particular if I am having some conceptual misunderstanding of what the Semaphore is doing (I'm a newbie to multiprocessing). Thanks a lot!
First, I don't know how your posted code can run at all. You have:
if __name__ = "___main__": # two errors here: = -> ==, "___main__" -> "__main__"
i.e. this should be:
if __name__ == "__main__":
When you are running on a platform that uses the spawn method to start child processes, each child process will be executing everything defined at global scope unless it is within the above if __name__ == "__main__":
block. That means that each process will be creating and using its own semaphore. Clearly, this is a problem. You need to create the semaphore in the main process and pass it as an argument to your "worker" function, test_print
:
import multiprocessing
import time
def test_print(semaphore):
with semaphore:
print("starting!")
time.sleep(5)
print("ending!")
if __name__ == "__main__":
concurrency = 3
semaphore = multiprocessing.Semaphore(concurrency)
processList = []
for _ in range(10):
p = multiprocessing.Process(target=test_print, args=(semaphore,))
p.start()
processList.append(p)
for p in processList:
p.join()
Prints:
starting!
starting!
starting!
ending!
starting!
ending!
starting!
ending!
starting!
ending!
ending!
ending!
starting!
starting!
starting!
ending!
ending!
ending!
starting!
ending!
Update
I only just now saw the comment posted by Charchit Agarwal, which is exactly what I have said.