I'm attempting to start multiprocessing in python 3.6 (the Anaconda distribution). I've heavily tested my internal function (numerical integration), so I'm confident that it works. What is currently giving me trouble is passing the proper ranges because I get some "none" returns.
import multiprocessing
from multiprocessing import Pool
def chunkwise(t, size=2):
it = iter(t)
return zip(*[it]*size)
def sint(tupl):
print('arg = ',tupl)
#lower = float(tupl[0])
#upper = float(tupl[1])
exit()
#ans = scipy.integrate.quad(int2,lower,upper)
#return ans
n_CPUs = 6
smin = float(10000)
smax = float(np.inf)
smax_spacing = float(2.5*10**12)
srange = np.linspace(smin,smax_spacing,n_CPUs)
srange = np.append(srange,np.inf)
print('length srange = ',len(srange))
filler=[]
for i in range(len(srange)):
if i == 0:
filler.append(float(srange[i]))
elif srange[i] == srange[-1]:
filler.append(float(srange[i]))
else:
filler.append(float(srange[i]))
filler.append(float(srange[i]))
srange = np.array(filler)
srange = list(chunkwise(srange))
def main():
pool = Pool(processes=n_CPUs)
res1 = pool.map(sint,[(smin,float(smin*2)), (float(smin*2),float(smin*3))])#srange)
res = sum(res1)
pool.close()
pool.join()
return res
if __name__ =="__main__":
result = main()
Some of my debugging process can be see in the code I included here. At the moment, I just want to see the arguments that are being passed to my sint() function. When I print the result, I get the result
arg = (number,bigger number)
None
arg = (number2, bigger number2)
None
Why are these "None"s arising? At present, their presence is causing overflows/NaNs that aren't present in the non-parallelized version of the code. Is there a way to not get the "None"s to show up? I tried checking for the presence of "None" in tupl, lower, and upper, but Python seems to not want to identify these (wouldn't print the message "None detected" that I wrote in).
Any help would be very appreciated! Let me know if more information is needed.
One issue is that multiprocessing launches a separate process for everything you've wrote, it creates a separate Python instance entirely, so your code is actually running everything you've put in global scope multiple times. Running your code will return
>>> length srange = 7
>>> length srange = 7
multiple times for me. You need to move your other code into either a separate function or just call it inside of def main()
. Fixing this still results in nones however, which appears to be due to the fact you don't actually return anything in your mapping function, smin
in pool.map
. Normally your results would be None
objects (and sum cannot sum over none objects either) but there's another problem here. Your processes don't actually close.
This is probably because you call exit, there isn't a return or anything, not even None
.
You don't call exit to end a mapping function, please look at multiprocessing to see the examples there. Just use a normal function as your mapper, no need to use a system call.
Even though this is not what you want, this is a simple example to show actual functioning multiprocessing code with your example:
EDIT: I didn't realize most of what you posted was not required, I encourage you to make minimal verifiable examples when you post questions, I've minified and changed what I origional posted to do actual integration, I also encourage you to use proper naming conventions when you ask questions and write your own code, sint
and tupl
are not exceptable descriptive names. What i've done here is shown you how integration can be carried out properly in parrallel using the same scipy integration utility you provided. You can replace integrated_function
with the code for your own function and it should work the same
from multiprocessing import Pool
from scipy import integrate
def integrated_function(x):
return x ** 2
def integration_process(integration_range):
print("thread launched, tuple = ", integration_range)
lower = float(integration_range[0])
upper = float(integration_range[1])
y, err = integrate.quad(integrated_function, lower, upper)
return y
def main():
# notice how we put this inside this main function
n_CPUs = 6
total_integration_range = 60000
integration_chunks = 6
integration_step = total_integration_range / integration_chunks
integration_ranges = [(i * integration_step, (i + 1) * integration_step) for i in range(integration_chunks)]
pool = Pool(processes=n_CPUs)
res1 = pool.map(integration_process, integration_ranges) # srange)
res = sum(res1)
print(res)
pool.close()
pool.join()
return res
if __name__ == "__main__":
result = main()
# thread launched, tuple = (0, 10000)
# thread launched, tuple = (10000, 20000)
# thread launched, tuple = (20000, 30000)
# thread launched, tuple = (30000, 40000)
# thread launched, tuple = (40000, 50000)
# thread launched, tuple = (50000, 60000)
# 72000000000000.0
If your function is complicated enough and the integration is large enough the overhead of multiprocessing should be low enough for it to be faster, note that printing out with in threads causes slowdown you don't want, so out side of debugging I would encourage you not to print.
EDIT: Since they want to do infinite integration I'll also post my thoughts and addendum to the code on that here, instead of leaving it burred in the comments.
Technically even with infinite integration range, you aren't actually integrating infinitely, the specific numerical methods of approximating integrating infinitely are beyond the scope of this question, however since scipy.ntegrate.quad
is a uses Gaussian Quadrature to carry out its integration (hence the name 'quad
'), it fixes this, and can take np.inf
as a bound. Unfortunately I don't know how to guarantee contiguous performance with this bound, it may take longer to do that bound than all of the rest of the integrations, or it may take much less time, which means dividing the work into equal chunks becomes harder. however you would only need to change the last bound on the integration ranges to also include infinity in the range.
That change looks like this:
integration_ranges = [(i * integration_step, (i + 1) * integration_step) for i in range(integration_chunks)]
# we take the last element of the array, and all but the last element of the tuple,
# and make a new tuple with np.inf as the last element
integration_ranges[-1] = integration_ranges[-1][:-1] + (np.inf,)
After doing this, your last bound should be bounded by infinity, so your total integration range will actually be 0 -> inf, even if total_integration_range
isn't infinity