I am trying to compare the efficiency of multiprocessing module in Python by performing a CPU intensive task.
Sequential Task:
import multiprocessing
import time
v1 = [0] * 5000000
v2 = [0] * 5000000
def worker1(nums):
global v1
for i in range(nums):
v1[i] = i*i
def worker2(nums):
global v2
for i in range(nums):
v2[i] = i*i*i
start = time.time()
worker1(5000000)
worker2(5000000)
end = time.time()
print(end-start)
Time taken for sequential task - ~ 1 second
The same task using multiprocessing:
import multiprocessing
import time
def worker1(nums,v1):
for i in range(nums):
v1[i] = i*i
def worker2(nums,v2):
for i in range(nums):
v2[i] = i*i*i
v1 = multiprocessing.Array('i',5000000)
v2 = multiprocessing.Array('i',5000000)
p1 = multiprocessing.Process(target=worker1, args = (5000000,v1))
p2 = multiprocessing.Process(target=worker2, args = (5000000,v2))
start = time.time()
p1.start()
p2.start()
p1.join()
p2.join()
end = time.time()
print(end-start)
Time taken for sequential task - ~ 12 seconds
The difference between the two is very significant and even though I can understand that there are some overheads in multiprocessing, it should have been faster than the sequential one right?
Please let me know if I am doing something wrong or if there is a silly mistake that should be corrected.
Python multiprocessing.Array has lock=True
by default, and any write you do will lock and unlock a mutex (and potentially flush the CPU caches), this alone accounts for 11
of the 12
seconds of the multiprocessing version, using multiprocessing.Array('i',5000000, lock=False)
alone brings it down to 1 second
.
Now 2 processes take equal time to 1 process to do the same work, the culprit here is that we are also comparing list
to multiprocessing.Array
. if we use multiprocessing.Array
for the single threaded version too we get
0.8543 1 process, list
1.2004 1 process, Array
0.8488 2 process, Array
multiprocessin.Array
is slower than list
because list
stores pointers to python integer objects, while Array
has to unbox this object to obtain the underlying integer value and write it to the C array, remember python integers have infinite precision, matter of fact if you replace multiprocessing.Array
with array.array, you will get an overflow exception ! the data that was written to the multiprocessing.Array
is not even correct.
import multiprocessing
import time
def worker1(nums, v1):
for i in range(nums):
v1[i] = i * i
def worker2(nums, v2):
for i in range(nums):
v2[i] = i * i * i
def one_process_list():
v1 = [0] * 5000000
v2 = [0] * 5000000
def worker1(nums):
for i in range(nums):
v1[i] = i * i
def worker2(nums):
for i in range(nums):
v2[i] = i * i * i
start = time.time()
worker1(5000000)
worker2(5000000)
end = time.time()
print(f"{end-start:.4} 1 process, list")
def one_process_array():
v1 = multiprocessing.Array('i', 5000000, lock=False)
v2 = multiprocessing.Array('i', 5000000, lock=False)
start = time.time()
worker1(5000000, v1)
worker2(5000000, v2)
end = time.time()
print(f"{end - start:.4} 1 process, Array")
def two_process_array():
v1 = multiprocessing.Array('i', 5000000, lock=False)
v2 = multiprocessing.Array('i', 5000000, lock=False)
p1 = multiprocessing.Process(target=worker1, args=(5000000, v1))
p2 = multiprocessing.Process(target=worker2, args=(5000000, v2))
start = time.time()
p1.start()
p2.start()
p1.join()
p2.join()
end = time.time()
print(f"{end - start:.4} 2 process, Array")
if __name__ == "__main__":
one_process_list()
one_process_array()
two_process_array()
One way around this boxing is wrapping the shared_memory in numpy
array, see Sharing contiguous numpy arrays between processes in python, this way you can do operations directly in C without boxing.
from multiprocessing.sharedctypes import RawArray
import numpy as np
def worker_numpy(nums, v1_raw):
v1 = np.frombuffer(v1_raw, dtype=np.int32)
v1[:] = np.arange(nums) ** 2 # iterates a 40 MB array 3 times !
def two_process_numpy():
my_dtype = np.int32
def create_shared_array(size, dtype=np.int32):
dtype = np.dtype(dtype)
if dtype.isbuiltin and dtype.char in 'bBhHiIlLfd':
typecode = dtype.char
else:
typecode, size = 'B', size * dtype.itemsize
return RawArray(typecode, size)
v1 = create_shared_array(5000000, dtype=my_dtype)
v2 = create_shared_array(5000000, dtype=my_dtype)
p1 = multiprocessing.Process(target=worker_numpy, args=(5000000, v1))
p2 = multiprocessing.Process(target=worker_numpy, args=(5000000, v2))
start = time.time()
p1.start()
p2.start()
p1.join()
p2.join()
end = time.time()
print(f"{end - start:.4} 2 process, numpy")
0.8543 1 process, list
1.2004 1 process, Array
0.8488 2 process, Array
0.2774 2 process, numpy
0.0543 1 process, numpy
with numpy the entire time is actually wasted spawning the 2 extra processes, note that you may get different timing on linux where the cost of fork
is less than the cost of spawn
on windows, but the relative ordering won't change, also with numpy, since the GIL is dropped, we can use multithreading instead to parallelize our code.