Search code examples
pythonpython-multiprocessing

Python Multiprocessing shared variables erratic behavior


The following simple code should, as far I can see, always print out '0' in the end. However, when running it with "lock = True", it often prints out other positive or negative numbers.

  import multiprocessing as mp
  import sys
  import time

  num = mp.Value('d', 0.0, lock = False)


  def func1():
      global num
      print ('start func1')
      #While num.value < 100000:
      for x in range(1000):
          num.value += 1
          #print(num.value)
      print ('end func1')

  def func2():
      global num
      print ('start func2')
      #while num.value > -10000:
      for x in range(1000):
          num.value -= 1
          #print(num.value)
      print ('end func2')

if __name__=='__main__':
    ctx = mp.get_context('fork')
    p1 =  ctx.Process(target=func1)
    p1.start()
    p2 = ctx.Process(target=func2)
    p2.start()
    p1.join()
    p2.join()
    sys.stdout.flush()
    time.sleep(25)
    print(num.value)

Can anyone offer any explanation?

To clarify: When lock is set to "False", it behaves as expected, printing out '0', however, when it is "True" it often does not.

This is more noticeable/happens more often for larger values of 'range'.

Tested this on two platforms (Mac OSx and Ubuntu 14.04.01) both with python 3.6.


Solution

  • The docs for multiprocessing.Value are very explicit about this:

    Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do

    counter.value += 1
    

    Assuming the associated lock is recursive (which it is by default) you can instead do

    with counter.get_lock():
        counter.value += 1
    

    To your comment, this is not "1000 incrementations". This is 1000 iterations of:

    # Take lock on num.value
    temp_value = num.value    # (1)
    # release lock on num.value (anything can modify it now)
    temp_value += 1           # (2)
    # Take lock on num.value
    num.value = temp_value    # (3)
    # release lock on num.value
    

    That's what it means when it says += is not atomic.

    If num.value is modified by another process during line 2, then line 3 will write the wrong value to num.value.


    To give an example of a better way to approach what you're doing, here's a version using Queues that ensures everything stays tick-tock in lock step:

    import multiprocessing as mp
    import queue
    import sys
    
    
    # An increment process. Takes a value, increments it, passes it along
    def func1(in_queue: mp.Queue, out_queue: mp.Queue):
        print('start func1')
    
        for x in range(1000):
            n = in_queue.get()
            n += 1
            print("inc", n)
            out_queue.put(n)
        print('end func1')
    
    
    # An decrement process. Takes a value, decrements it, passes it along
    def func2(in_queue: mp.Queue, out_queue: mp.Queue):
        print('start func2')
        for x in range(1000):
            n = in_queue.get()
            n -= 1
            print("dec", n)
            out_queue.put(n)
        print('end func2')
    
    
    if __name__ == '__main__':
        ctx = mp.get_context('fork')
    
        queue1 = mp.Queue()
        queue2 = mp.Queue()
    
        # Make two processes and tie their queues back to back. They hand a value
        # back and forth until they've run their course.
        p1 = ctx.Process(target=func1, args=(queue1, queue2,))
        p1.start()
        p2 = ctx.Process(target=func2, args=(queue2, queue1,))
        p2.start()
    
        # Get it started
        queue1.put(0)
    
        # Wait from them to finish
        p1.join()
        p2.join()
    
        # Since this is a looping process, the result is on the queue we put() to.
        # (Using block=False because I'd rather throw an exception if something
        # went wrong rather than deadlock.)
        num = queue1.get(block=False)
    
        print("FINAL=%d" % num)
    

    This is a very simplistic example. In more robust code you need to think about what happens in failure cases. For example, if p1 throws an exception, p2 will deadlock waiting for its value. In many ways that's a good thing since it means you can recover the system by starting a new p1 process with the same queues. This way of dealing with concurrency is called the Actor model if you want to study it further.