Search code examples
pythonmultithreadingmultiprocessingpython-multiprocessing

outside vs inside __main__ variable definition in multiprocessing


I have following code:

import multiprocessing
import time
import os

# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
semaphore = multiprocessing.Semaphore(1)

def producer(num, output):
  semaphore.acquire()
  time.sleep(1)
  element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
  print "WRITE -> " + element
  output.put(element)
  time.sleep(1)
  semaphore.release()

if __name__ == '__main__':
    """
    Reads elements as soon as they are are put inside queue
    """

    output    = multiprocessing.Manager().Queue()
    pool      = multiprocessing.Pool(4)
    lst       = range(40)

    # WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
    # semaphore = multiprocessing.Semaphore(1)

    for i in lst:
        pool.apply_async(producer, (i, output))
        # print "%d Do not wait!" % i
        # res.get()

    counter = 0
    while True:
      try:
        print "READ  <- " + output.get_nowait()
        counter += 1
        if (counter == len(lst)):
          print "Break"
          break
      except:
        print "READ  <- NOTHING IN BUFFER"  
        pass
      time.sleep(1)

This code is working as expected and it prints:

READ  <- NOTHING IN BUFFER
WRITE -> PROCESS: 0 PID: 15803 PPID: 15798
READ  <- NOTHING IN BUFFER
READ  <- PROCESS: 0 PID: 15803 PPID: 15798
READ  <- NOTHING IN BUFFER
WRITE -> PROCESS: 1 PID: 15806 PPID: 15798
READ  <- PROCESS: 1 PID: 15806 PPID: 15798
...

Then I have this version which is not working (It is basically the same as first one except the definition of semaphore is in another place):

import multiprocessing
import time
import os

# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
# semaphore = multiprocessing.Semaphore(1)

def producer(num, output):
  print hex(id(semaphore))
  semaphore.acquire()
  time.sleep(1)
  element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
  print "WRITE -> " + element
  output.put(element)
  time.sleep(1)
  semaphore.release()

if __name__ == '__main__':
    """
    Reads elements as soon as they are are put inside queue
    """

    output    = multiprocessing.Manager().Queue()
    pool      = multiprocessing.Pool(4)
    lst       = range(40)

    # WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
    semaphore = multiprocessing.Semaphore(1)

    for i in lst:
        pool.apply_async(producer, (i, output))
        # print "%d Do not wait!" % i
        # res.get()

    counter = 0
    while True:
      try:
        print "READ  <- " + output.get_nowait()
        counter += 1
        if (counter == len(lst)):
          print "Break"
          break
      except:
        print "READ  <- NOTHING IN BUFFER"  
        pass
      time.sleep(1)

This version prints:

READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
...

It seems like if producer never writes anything to Queue. I've read somewhere that apply_sync does not print error messages. So I've changed pool.apply_async(producer, (i, output)) to pool.apply(producer, (i, output)) in second code, to see what is going on. It seems that semaphore is not defined, here is the output:

Traceback (most recent call last):
  File "glob_var_wrong.py", line 31, in <module>
    pool.apply(producer, (i, output))
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: global name 'semaphore' is not defined

However following code runs correctly and print 10 (value defined inside __main__):

global_var = 20

def print_global_var():
    print global_var

if __name__ == '__main__':
    global_var = 10
    print_global_var()

It seems that in this code global variable can be defined inside __main__ while in previous codes it is not possible. First I was assuming that variables defined inside __main__ are not shared between processes but it only affects semaphore and not output, pool, lst. Why is this happening?


Solution

  • When you create a new process using Multiprocessing.Process (used underneath the hood by Pool, it copies the local scope, pickles it, and sends it to a new process to evaluate.

    Because you did not define the variable semaphore before calling Pool(4), the variable is undefined (in those OTHER processes where the code gets evaluated) and the function producer will throw an exception.

    To see this, change the definition

    def producer(num, output):
        print hex(id(semaphore))
        try:
            semaphore.acquire()
        except Exception as e:
            print e
        time.sleep(1)
        element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
        print "WRITE -> " + element
        output.put(element)
        time.sleep(1)
        semaphore.release()
    

    and now your failing code will print out a bunch (40) of errors that looks like

    global name 'semaphore' is not defined
    

    This is why semaphore has to be defined BEFORE calling Pool