I have following code:
import multiprocessing
import time
import os
# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
semaphore = multiprocessing.Semaphore(1)
def producer(num, output):
semaphore.acquire()
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
"""
Reads elements as soon as they are are put inside queue
"""
output = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(4)
lst = range(40)
# WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
# semaphore = multiprocessing.Semaphore(1)
for i in lst:
pool.apply_async(producer, (i, output))
# print "%d Do not wait!" % i
# res.get()
counter = 0
while True:
try:
print "READ <- " + output.get_nowait()
counter += 1
if (counter == len(lst)):
print "Break"
break
except:
print "READ <- NOTHING IN BUFFER"
pass
time.sleep(1)
This code is working as expected and it prints:
READ <- NOTHING IN BUFFER
WRITE -> PROCESS: 0 PID: 15803 PPID: 15798
READ <- NOTHING IN BUFFER
READ <- PROCESS: 0 PID: 15803 PPID: 15798
READ <- NOTHING IN BUFFER
WRITE -> PROCESS: 1 PID: 15806 PPID: 15798
READ <- PROCESS: 1 PID: 15806 PPID: 15798
...
Then I have this version which is not working (It is basically the same as first one except the definition of semaphore is in another place):
import multiprocessing
import time
import os
# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
# semaphore = multiprocessing.Semaphore(1)
def producer(num, output):
print hex(id(semaphore))
semaphore.acquire()
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
"""
Reads elements as soon as they are are put inside queue
"""
output = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(4)
lst = range(40)
# WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
semaphore = multiprocessing.Semaphore(1)
for i in lst:
pool.apply_async(producer, (i, output))
# print "%d Do not wait!" % i
# res.get()
counter = 0
while True:
try:
print "READ <- " + output.get_nowait()
counter += 1
if (counter == len(lst)):
print "Break"
break
except:
print "READ <- NOTHING IN BUFFER"
pass
time.sleep(1)
This version prints:
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
...
It seems like if producer
never writes anything to Queue. I've read somewhere that apply_sync
does not print error messages. So I've changed pool.apply_async(producer, (i, output))
to pool.apply(producer, (i, output))
in second code, to see what is going on. It seems that semaphore
is not defined, here is the output:
Traceback (most recent call last):
File "glob_var_wrong.py", line 31, in <module>
pool.apply(producer, (i, output))
File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
NameError: global name 'semaphore' is not defined
However following code runs correctly and print 10 (value defined inside __main__
):
global_var = 20
def print_global_var():
print global_var
if __name__ == '__main__':
global_var = 10
print_global_var()
It seems that in this code global variable can be defined inside __main__
while in previous codes it is not possible. First I was assuming that variables defined inside __main__
are not shared between processes but it only affects semaphore
and not output
, pool
, lst
. Why is this happening?
When you create a new process using Multiprocessing.Process
(used underneath the hood by Pool
, it copies the local scope, pickles it, and sends it to a new process to evaluate.
Because you did not define the variable semaphore
before calling Pool(4)
, the variable is undefined (in those OTHER processes where the code gets evaluated) and the function producer
will throw an exception.
To see this, change the definition
def producer(num, output):
print hex(id(semaphore))
try:
semaphore.acquire()
except Exception as e:
print e
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
and now your failing code will print out a bunch (40) of errors that looks like
global name 'semaphore' is not defined
This is why semaphore has to be defined BEFORE calling Pool