Question: How can I use variable x in script2? I have 2 scripts in which 1st contains 2 multiprocessing functions and 2nd contains 1 multiprocessing function. How can I use a shared variable for all 3 multiprocessing functions?
script1.py
from script2 import function3
x = None
def function1():
global x
while True:
x = input() # updates global variable x
def function2():
global x
while True:
print(x) # prints global variable x
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p3 = multiprocessing.Process(target=function3)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
# some condition to stop all processes
script2.py
def function3():
while True:
print(x*2) # prints global variable x*2
Here is an example of creating a shared managed string value per the comment offered by @martineau.
On a platform such as Linux where fork
by default is used to create new processes you could code:
import multiprocessing
from ctypes import c_char_p
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
def function1():
s.value = 'New value' # updates global variable s
event.set() # show we have a new value
def function2():
event.wait() # wait for new s value
print(s.value)
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
Prints:
New value
On platforms such as Windows where spawn
is used to create new processes, the shared string is being passed as an argument to the processes to ensure that only one instance of the string is being created.
import multiprocessing
from ctypes import c_char_p
def function1(s, event):
s.value = 'New value'
event.set() # show we have a new value
def function2(s, event):
event.wait() # wait for new s value
print(s.value)
# I need this for Windows:
if __name__ == '__main__':
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(s, event))
p2 = multiprocessing.Process(target=function2, args=(s, event))
p1.start()
p2.start()
p1.join()
p2.join()
Prints:
New value
The if __name__ == '__main__':
check above is needed or else we would get into a recursive loop because our newly created processes start executing the source from the top and without that check would create new processes ad infinitum. And for that reason the definitions of s
and event
cannot be outside that check or else each newly created process would be creating its own instance of these variables. But that means we now have to be passing these variables as arguments whereas in the forking example they can just be inherited.
Update: Creating a Shared numpy
Array on Linux/Unix
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)
Prints:
arr = [[1 1 1]
[1 1 1]]
arr = [[1 1 1]
[1 1 1]]
Creating a Shared numpy
Array on Windows
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def function1(arr, event):
shape = arr.shape
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2(arr, event):
event.wait() # wait for new arr value
print('arr =', arr)
if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(arr, event))
p2 = multiprocessing.Process(target=function2, args=(arr, event))
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)
Using a Shared numpy
Array With a Multiprocessing Pool on Windows
When using a multiprocessing pool, whether you are passing the array as an argument to the worker function or as in this case using it to initialize a global variable for each process in the pool, you must pass the shared array to each process and recreate a numpy
array from that.
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def init_pool(shared_array, the_shape, the_event):
global arr, shape, event
shape = the_shape
event = the_event
# recreate the numpy array from the shared array:
arr = to_numpy_array(shared_array, shape)
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)
Using a Shared numpy Array With a Multiprocessing Pool on Linux/Unix
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
pool = multiprocessing.Pool(2)
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)