Search code examples
pythonpython-3.xparallel-processingmultiprocessingnonetype

Multiprocessing in Python returns None unexpectedly


I'm attempting to start multiprocessing in python 3.6 (the Anaconda distribution). I've heavily tested my internal function (numerical integration), so I'm confident that it works. What is currently giving me trouble is passing the proper ranges because I get some "none" returns.

import multiprocessing
from multiprocessing import Pool

def chunkwise(t, size=2):
    it = iter(t)
    return zip(*[it]*size)

def sint(tupl):
    print('arg = ',tupl)
    #lower = float(tupl[0])
    #upper = float(tupl[1])
    exit()
    #ans = scipy.integrate.quad(int2,lower,upper) 
    #return ans

n_CPUs = 6 

smin = float(10000)
smax = float(np.inf)
smax_spacing = float(2.5*10**12)
srange = np.linspace(smin,smax_spacing,n_CPUs)

srange = np.append(srange,np.inf)
print('length srange = ',len(srange))
filler=[]

for i in range(len(srange)):
    if i == 0:
        filler.append(float(srange[i]))
    elif srange[i] == srange[-1]:
        filler.append(float(srange[i]))
    else:
        filler.append(float(srange[i]))
        filler.append(float(srange[i]))
srange = np.array(filler)
srange = list(chunkwise(srange))

def main():
    pool = Pool(processes=n_CPUs)
    res1 = pool.map(sint,[(smin,float(smin*2)),  (float(smin*2),float(smin*3))])#srange)
    res = sum(res1)
    pool.close()
    pool.join()
    return res

if __name__ =="__main__":
    result = main()

Some of my debugging process can be see in the code I included here. At the moment, I just want to see the arguments that are being passed to my sint() function. When I print the result, I get the result

arg = (number,bigger number)
None
arg = (number2, bigger number2)
None

Why are these "None"s arising? At present, their presence is causing overflows/NaNs that aren't present in the non-parallelized version of the code. Is there a way to not get the "None"s to show up? I tried checking for the presence of "None" in tupl, lower, and upper, but Python seems to not want to identify these (wouldn't print the message "None detected" that I wrote in).

Any help would be very appreciated! Let me know if more information is needed.


Solution

  • One issue is that multiprocessing launches a separate process for everything you've wrote, it creates a separate Python instance entirely, so your code is actually running everything you've put in global scope multiple times. Running your code will return

    >>> length srange =  7
    >>> length srange =  7
    

    multiple times for me. You need to move your other code into either a separate function or just call it inside of def main(). Fixing this still results in nones however, which appears to be due to the fact you don't actually return anything in your mapping function, smin in pool.map. Normally your results would be None objects (and sum cannot sum over none objects either) but there's another problem here. Your processes don't actually close.

    This is probably because you call exit, there isn't a return or anything, not even None.

    You don't call exit to end a mapping function, please look at multiprocessing to see the examples there. Just use a normal function as your mapper, no need to use a system call.

    Even though this is not what you want, this is a simple example to show actual functioning multiprocessing code with your example:

    EDIT: I didn't realize most of what you posted was not required, I encourage you to make minimal verifiable examples when you post questions, I've minified and changed what I origional posted to do actual integration, I also encourage you to use proper naming conventions when you ask questions and write your own code, sint and tupl are not exceptable descriptive names. What i've done here is shown you how integration can be carried out properly in parrallel using the same scipy integration utility you provided. You can replace integrated_function with the code for your own function and it should work the same

    from multiprocessing import Pool
    from scipy import integrate
    
    
    def integrated_function(x):
        return x ** 2
    
    
    def integration_process(integration_range):
        print("thread launched, tuple = ", integration_range)
        lower = float(integration_range[0])
        upper = float(integration_range[1])
        y, err = integrate.quad(integrated_function, lower, upper)
        return y
    
    
    def main():
        # notice how we put this inside this main function
        n_CPUs = 6
        total_integration_range = 60000
        integration_chunks = 6
        integration_step = total_integration_range / integration_chunks
        integration_ranges = [(i * integration_step, (i + 1) * integration_step) for i in range(integration_chunks)]
        pool = Pool(processes=n_CPUs)
        res1 = pool.map(integration_process, integration_ranges)  # srange)
        res = sum(res1)
        print(res)
        pool.close()
        pool.join()
        return res
    
    
    if __name__ == "__main__":
        result = main()
        # thread launched, tuple = (0, 10000)
        # thread launched, tuple = (10000, 20000)
        # thread launched, tuple = (20000, 30000)
        # thread launched, tuple = (30000, 40000)
        # thread launched, tuple = (40000, 50000)
        # thread launched, tuple = (50000, 60000)
        # 72000000000000.0
    

    If your function is complicated enough and the integration is large enough the overhead of multiprocessing should be low enough for it to be faster, note that printing out with in threads causes slowdown you don't want, so out side of debugging I would encourage you not to print.

    EDIT: Since they want to do infinite integration I'll also post my thoughts and addendum to the code on that here, instead of leaving it burred in the comments.

    Technically even with infinite integration range, you aren't actually integrating infinitely, the specific numerical methods of approximating integrating infinitely are beyond the scope of this question, however since scipy.ntegrate.quad is a uses Gaussian Quadrature to carry out its integration (hence the name 'quad'), it fixes this, and can take np.inf as a bound. Unfortunately I don't know how to guarantee contiguous performance with this bound, it may take longer to do that bound than all of the rest of the integrations, or it may take much less time, which means dividing the work into equal chunks becomes harder. however you would only need to change the last bound on the integration ranges to also include infinity in the range.

    That change looks like this:

    integration_ranges = [(i * integration_step, (i + 1) * integration_step) for i in range(integration_chunks)]
    # we take the last element of the array, and all but the last element of the tuple, 
    # and make a new tuple with np.inf as the last element
    integration_ranges[-1] = integration_ranges[-1][:-1] + (np.inf,)
    

    After doing this, your last bound should be bounded by infinity, so your total integration range will actually be 0 -> inf, even if total_integration_range isn't infinity