Search code examples
pythonparallel-python

How to limit the parallel python code for specific/ required value


I am using parallel Python for running the code below. Could any one correct the code? This should print "SORRY" for the values more than 20 in the inputs list. This is still working for values more than 20.

#http://www.parallelpython.com/

import sys,time,pp
import numpy 

n=int(raw_input("Enter the value of n: "))
#Enter any integer value

#Define is_low function as a filter 
def is_low(n):
    if n < 20:
       return True
    if n > 20:
       return False

def task(n):
    return (numpy.arange(n))


def expansion(n):
    if is_low(n)==True:
       return (task(n))
    if is_low(n)==False:
        print "SORRY"

ppservers=()
if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    job_server = pp.Server(ppservers=ppservers)


print job_server.get_ncpus()
job1=job_server.submit(task,(n,),(is_low,expansion,),("numpy","sys",))
result=job1()
print result

ppservers = ()

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

start_time = time.time()
inputs = (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,21,22,23,24,25,26)
jobs = [(input, job_server.submit(task,(input,),(expansion,is_low,),  ("numpy","sys","time",))) for input in inputs]
for input, job in jobs:
    print "Expansion for the below",input,"is",job()

print "Time elapsed: ", time.time() - start_time, "s"
job_server.print_stats()

Solution

  • In is_low you should check that n <= 20 rather than n < 20. If you specify n as 20 nothing get returned. I think is_low should look like this:

    def is_low(n): 
        if n <= 20: 
            return True 
        if n > 20: 
            return False
    

    I believe the reason why your code really doesn't work is because you have reversed what the function expansion and task are doing. If you change these two functions to be:

    def expansion(n):
        return (numpy.arange(n))
    
    def task(n):
        if is_low(n)==True:
            return (expansion(n))
        if is_low(n)==False:
            return "SORRY"
    

    This is different than what you have. task should call expansion and not the other way around. I also return "SORRY" rather than print it. Rather than returning "SORRY" You should return a value like -1 to signal an error. When the jobs are complete you can check for -1 and display a message.

    The code below isn't entirely what you have asked for but I think it is enough to get you going:

    #http://www.parallelpython.com/
    
    import sys,time,pp
    import numpy
    
    n=int(raw_input("Enter the value of n: "))
    #Enter any integer value
    
    #Define is_low function as a filter
    def is_low(n):
        if n <= 20:
           return True
        if n > 20:
           return False
    
    def expansion(n):
        return (numpy.arange(n))
    
    
    def task(n):
        if is_low(n)==True:
           return (expansion(n))
        if is_low(n)==False:
           return "SORRY"
    
    ppservers=()
    if len(sys.argv) > 1:
        ncpus = int(sys.argv[1])
        job_server = pp.Server(ncpus, ppservers=ppservers)
    else:
        job_server = pp.Server(ppservers=ppservers)
    
    
    print job_server.get_ncpus()
    job1=job_server.submit(task,(n,),(is_low,expansion,),("numpy","sys",))
    result=job1()
    print result
    
    ppservers = ()
    
    if len(sys.argv) > 1:
        ncpus = int(sys.argv[1])
        job_server = pp.Server(ncpus, ppservers=ppservers)
    else:
        job_server = pp.Server(ppservers=ppservers)
    
    print "Starting pp with", job_server.get_ncpus(), "workers"
    
    start_time = time.time()
    inputs = (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,21,22,23,24,25,26)
    jobs = [(input, job_server.submit(task,(input,),(expansion,is_low,),  ("numpy","sys","time",))) for input in inputs]
    for input, job in jobs:
        print "Expansion for the below",input,"is",job()
    
    print "Time elapsed: ", time.time() - start_time, "s"
    job_server.print_stats()