Search code examples
pythonmultiprocessingpython-multiprocessing

How to share a global variable with another script in multiprocessing?


Question: How can I use variable x in script2? I have 2 scripts in which 1st contains 2 multiprocessing functions and 2nd contains 1 multiprocessing function. How can I use a shared variable for all 3 multiprocessing functions?

script1.py

from script2 import function3
x = None
def function1():
    global x
    while True:
        x = input()  # updates global variable x

def function2():
    global x
    while True:
        print(x)     # prints global variable x

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p3 = multiprocessing.Process(target=function3)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

# some condition to stop all processes

script2.py

def function3():
    while True:      
        print(x*2)   # prints global variable x*2

Solution

  • Here is an example of creating a shared managed string value per the comment offered by @martineau.

    On a platform such as Linux where fork by default is used to create new processes you could code:

    import multiprocessing
    from ctypes import c_char_p
    
    s = multiprocessing.Manager().Value(c_char_p, '')
    event = multiprocessing.Event()
    
    def function1():
        s.value = 'New value'  # updates global variable s
        event.set() # show we have a new value
    
    def function2():
        event.wait() # wait for new s value
        print(s.value)
    
    p1 = multiprocessing.Process(target=function1)
    p2 = multiprocessing.Process(target=function2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    

    Prints:

    New value
    

    On platforms such as Windows where spawn is used to create new processes, the shared string is being passed as an argument to the processes to ensure that only one instance of the string is being created.

    import multiprocessing
    from ctypes import c_char_p
    
    def function1(s, event):
        s.value = 'New value'
        event.set() # show we have a new value
    
    def function2(s, event):
        event.wait() # wait for new s value
        print(s.value)
    
    # I need this for Windows:
    if __name__ == '__main__':
        s = multiprocessing.Manager().Value(c_char_p, '')
        event = multiprocessing.Event()
        p1 = multiprocessing.Process(target=function1, args=(s, event))
        p2 = multiprocessing.Process(target=function2, args=(s, event))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    

    Prints:

    New value
    

    The if __name__ == '__main__': check above is needed or else we would get into a recursive loop because our newly created processes start executing the source from the top and without that check would create new processes ad infinitum. And for that reason the definitions of s and event cannot be outside that check or else each newly created process would be creating its own instance of these variables. But that means we now have to be passing these variables as arguments whereas in the forking example they can just be inherited.

    Update: Creating a Shared numpy Array on Linux/Unix

    import multiprocessing
    import ctypes
    import numpy as np
    
    def to_numpy_array(shared_array, shape):
        '''Create a numpy array backed by a shared memory Array.'''
        arr = np.ctypeslib.as_array(shared_array)
        return arr.reshape(shape)
    
    def to_shared_array(arr, ctype):
        shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
        temp = np.frombuffer(shared_array, dtype=arr.dtype)
        temp[:] = arr.flatten(order='C')
        return shared_array
    
    arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
    shape = arr.shape
    shared_array = to_shared_array(arr, ctypes.c_int32)
    # You have to now use the shared array as the base:
    arr = to_numpy_array(shared_array, shape)
    event = multiprocessing.Event()
    
    def function1():
        for x in range(shape[0]):
            for y in range(shape[1]):
                arr[x, y] = 1
        event.set() # show we have a new value
    
    def function2():
        event.wait() # wait for new arr value
        print('arr =', arr)
    
    p1 = multiprocessing.Process(target=function1)
    p2 = multiprocessing.Process(target=function2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('arr =', arr)
    

    Prints:

    arr = [[1 1 1]
     [1 1 1]]
    arr = [[1 1 1]
     [1 1 1]]
    

    Creating a Shared numpy Array on Windows

    import multiprocessing
    import ctypes
    import numpy as np
    
    def to_numpy_array(shared_array, shape):
        '''Create a numpy array backed by a shared memory Array.'''
        arr = np.ctypeslib.as_array(shared_array)
        return arr.reshape(shape)
    
    def to_shared_array(arr, ctype):
        shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
        temp = np.frombuffer(shared_array, dtype=arr.dtype)
        temp[:] = arr.flatten(order='C')
        return shared_array
    
    def function1(arr, event):
        shape = arr.shape
        for x in range(shape[0]):
            for y in range(shape[1]):
                arr[x, y] = 1
        event.set() # show we have a new value
    
    def function2(arr, event):
        event.wait() # wait for new arr value
        print('arr =', arr)
    
    if __name__ == '__main__':
        arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
        shape = arr.shape
        shared_array = to_shared_array(arr, ctypes.c_int32)
        # You have to now use the shared array as the base:
        arr = to_numpy_array(shared_array, shape)
        event = multiprocessing.Event()
    
        p1 = multiprocessing.Process(target=function1, args=(arr, event))
        p2 = multiprocessing.Process(target=function2, args=(arr, event))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print('arr =', arr)
    

    Using a Shared numpy Array With a Multiprocessing Pool on Windows

    When using a multiprocessing pool, whether you are passing the array as an argument to the worker function or as in this case using it to initialize a global variable for each process in the pool, you must pass the shared array to each process and recreate a numpy array from that.

    import multiprocessing
    import ctypes
    import numpy as np
    
    def to_numpy_array(shared_array, shape):
        '''Create a numpy array backed by a shared memory Array.'''
        arr = np.ctypeslib.as_array(shared_array)
        return arr.reshape(shape)
    
    def to_shared_array(arr, ctype):
        shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
        temp = np.frombuffer(shared_array, dtype=arr.dtype)
        temp[:] = arr.flatten(order='C')
        return shared_array
    
    def init_pool(shared_array, the_shape, the_event):
        global arr, shape, event
        shape = the_shape
        event = the_event
        # recreate the numpy array from the shared array:
        arr = to_numpy_array(shared_array, shape)
    
    def function1():
        for x in range(shape[0]):
            for y in range(shape[1]):
                arr[x, y] = 1
        event.set() # show we have a new value
    
    def function2():
        event.wait() # wait for new arr value
        print('arr =', arr)
    
    if __name__ == '__main__':
        arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
        shape = arr.shape
        shared_array = to_shared_array(arr, ctypes.c_int32)
        # You have to now use the shared array as the base:
        arr = to_numpy_array(shared_array, shape)
        event = multiprocessing.Event()
        pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
        pool.apply_async(function1)
        pool.apply_async(function2)
        # wait for tasks to complete
        pool.close()
        pool.join()
        print('arr =', arr)
    

    Using a Shared numpy Array With a Multiprocessing Pool on Linux/Unix

    import multiprocessing
    import ctypes
    import numpy as np
    
    def to_numpy_array(shared_array, shape):
        '''Create a numpy array backed by a shared memory Array.'''
        arr = np.ctypeslib.as_array(shared_array)
        return arr.reshape(shape)
    
    def to_shared_array(arr, ctype):
        shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
        temp = np.frombuffer(shared_array, dtype=arr.dtype)
        temp[:] = arr.flatten(order='C')
        return shared_array
    
    arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
    shape = arr.shape
    shared_array = to_shared_array(arr, ctypes.c_int32)
    # You have to now use the shared array as the base:
    arr = to_numpy_array(shared_array, shape)
    event = multiprocessing.Event()
    
    def function1():
        for x in range(shape[0]):
            for y in range(shape[1]):
                arr[x, y] = 1
        event.set() # show we have a new value
    
    def function2():
        event.wait() # wait for new arr value
        print('arr =', arr)
    
    pool = multiprocessing.Pool(2)
    pool.apply_async(function1)
    pool.apply_async(function2)
    # wait for tasks to complete
    pool.close()
    pool.join()
    print('arr =', arr)