I am trying to perform some computation between a 2D array and a collection of 2D arrays using multiprocessing
. Suppose I have a matrix, mat1
, and a collection of matricies, test
, in which I would like to compute all matrix multiplications between mat1
and the elements of test
. I am using multiprocessing to run the computations in parallel since the size of test
is very large. However, I noticed that even for a small test
, the computation never completes. Specifically, the program seems to never finish a matrix multiplication computation. It seems that a call to a particular sklearn
function is causing the issue. I wrote the following code to illustrate this (I use partial
rather than starmap
because I would like to use imap
and tqdm
at a later point in time):
from multiprocessing import Pool
from functools import partial
import numpy as np
import sklearn as sklearn
def bar(y, x):
# this does not seem to complete
mul = x @ y.T
# so this does not print
print('done')
return mul
def foo():
mat1 = np.ones((1000000, 14))
test = (np.ones((1,14)), np.ones((1,14)))
# these will finish
print(mat1 @ test[0].T)
print(mat1 @ test[1].T)
with Pool(6) as pool:
result = pool.map(partial(bar, x=mat1), test
p.close()
p.join()
if __name__ == "__main__":
# Causes the hang
sklearn.metrics.pairwise.rbf_kernel(np.ones((9000, 14)),
np.ones((9000, 14)))
foo()
NOTE: For those unfamilar with partial
, this is from the documentation:
functools.partial(func[,*args][, **keywords])
Return a new partial object which when called will behave like func called with the positional arguments args and keyword arguments keywords.
I am forced to stop execution manually, otherwise it will run forever. Am I not using multiprocessing
correctly?
For those interested, the full traceback after force stopping can be found below:
--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) <ipython-input-18-6c073b574e37> in <module>
8
9 sklearn.metrics.pairwise.rbf_kernel(np.ones((9000, 14)), np.ones((9000, 14)))
---> 10 foo()
11
<ipython-input-17-d183fc19ae3c> in foo()
11 with Pool(6) as pool:
12 # this will not finish
---> 13 result = pool.map(partial(bar, x=mat1), test)
14 p.close()
15 p.join()
~/anaconda3/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
266 in a list that is returned.
267 '''
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
269
270 def starmap(self, func, iterable, chunksize=None):
~/anaconda3/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
649
650 def get(self, timeout=None):
--> 651 self.wait(timeout)
652 if not self.ready():
653 raise TimeoutError
~/anaconda3/lib/python3.7/multiprocessing/pool.py in wait(self, timeout)
646
647 def wait(self, timeout=None):
--> 648 self._event.wait(timeout)
649
650 def get(self, timeout=None):
~/anaconda3/lib/python3.7/threading.py in wait(self, timeout)
550 signaled = self._flag
551 if not signaled:
--> 552 signaled = self._cond.wait(timeout)
553 return signaled
554
~/anaconda3/lib/python3.7/threading.py in wait(self, timeout)
294 try: # restore state no matter what (e.g., KeyboardInterrupt)
295 if timeout is None:
--> 296 waiter.acquire()
297 gotit = True
298 else:
KeyboardInterrupt:
UPDATE 1:
After more debugging, I have discovered something peculiar. After implementing sokato's code, I managed to fix this example. However, I can trigger the issue again when calling the following sklearn
function right before foo()
in main()
:
sklearn.metrics.pairwise.rbf_kernel(np.ones((9000, 14)), np.ones((9000, 14)))
I have updated the original post to reflect this.
You need to close the multiprocessing pool. e.g
def bar(y, x):
# this does not seem to complete
mul = x @ y.T
# so this does not print
print('done')
return mul
def foo():
mat1 = np.ones((1000000, 14))
test = (np.ones((1,14)), np.ones((1,14)))
with Pool(5) as p:
# this will not finish
result = p.map(partial(bar, x=mat1), test)
p.close()
if __name__ == "__main__":
foo()
To fit your exact syntax you can do it like
pool = Pool(6)
result = pool.map(partial(bar, x=mat1), test)
pool.close()
If you are interested to learn more, I encourage you to check out the documentation. https://docs.python.org/3.4/library/multiprocessing.html?highlight=process#multiprocessing.pool.Pool