Search code examples
pythonparallel-processingpython-multiprocessingembarrassingly-parallelparallelism-amdahl

Expected speedup from embarrassingly parallel task using Python Multiprocessing


I'm learning to use Python's Multiprocessing package for embarrassingly parallel problems, so I wrote serial and parallel versions for determining the number of primes less than or equal to a natural number n. Based on what I read from a blog post and a Stack Overflow question, I came up with the following code:

Serial

import math
import time

def is_prime(start, end):
    """determine how many primes within given range"""
    numPrime = 0
    for n in range(start, end+1):
        isPrime = True
        for i in range(2, math.floor(math.sqrt(n))+1):
            if n % i == 0:
                isPrime = False
                break
        if isPrime:
            numPrime += 1
    if start == 1:
        numPrime -= 1  # since 1 is not prime
    return numPrime

if __name__ == "__main__":
    natNum = 0
    while natNum < 2:
        natNum = int(input('Enter a natural number greater than 1: '))
    startTime = time.time()
    finalResult = is_prime(1, natNum)
    print('Elapsed time:', time.time()-startTime, 'seconds')
    print('The number of primes <=', natNum, 'is', finalResult)

Parallel

import math
import multiprocessing as mp
import numpy
import time


def is_prime(vec, output):
    """determine how many primes in vector"""
    numPrime = 0
    for n in vec:
        isPrime = True
        for i in range(2, math.floor(math.sqrt(n))+1):
            if n % i == 0:
                isPrime = False
                break
        if isPrime:
            numPrime += 1
    if vec[0] == 1:
        numPrime -= 1  # since 1 is not prime
    output.put(numPrime)


def chunks(vec, n):
    """evenly divide list into n chunks"""
    for i in range(0, len(vec), n):
        yield vec[i:i+n]

if __name__ == "__main__":
    natNum = 0
    while natNum < 2:
        natNum = int(input('Enter a natural number greater than 1: '))
    numProc = 0
    while numProc < 1:
        numProc = int(input('Enter the number of desired parallel processes: '))
    startTime = time.time()
    numSplits = math.ceil(natNum/numProc)
    splitList = list(chunks(tuple(range(1, natNum+1)), numSplits))
    output = mp.Queue()
    processes = [mp.Process(target=is_prime, args=(splitList[jobID], output))
                 for jobID in range(numProc)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print('Elapsed time:', time.time()-startTime, 'seconds')
    procResults = [output.get() for p in processes]
    finalResult = numpy.sum(numpy.array(procResults))
    print('Results from each process:\n', procResults)
    print('The number of primes <=', natNum, 'is', finalResult)

Here is what I get for n=10000000 (for parallel I request 8 processes):

$ python serial_prime_test.py 
Enter a natural number greater than 1: 10000000
Elapsed time: 162.1960825920105 seconds
The number of primes <= 10000000 is 664579
$ python parallel_prime_test.py
Enter a natural number greater than 1: 10000000
Enter the number of desired parallel processes: 8
Elapsed time: 49.41204643249512 seconds
Results from each process:
[96469, 86603, 83645, 80303, 81796, 79445, 78589, 77729]
The number of primes <= 10000000 is 664579

So it looks like I can get a little over 3x speedup. Here are my questions:

  1. Clearly this is not linear speedup, so how much better can I do (or what speedup should I realistically expect)?
  2. It looks like Amdahl's Law answers this, but I don't know how to determine what fraction of my program is strictly serial.

Any help is appreciated.

Edit: There are 4 physical cores, capable of hyperthreading.


Solution

  • I think you want to divide the work up differently.

    Although your program divides the range of candidate integers evenly across cores, the work in each range is not likely to be even. That means some cores finish early, and have nothing to do, while others are still running. That loses parallel efficiency, fast.

    Just to make the point, imagine you have 1000 cores. The first core sees very small candidate numbers and doesn't take long to factor them, then goes idle. The last (thousandth) core sees only very big candidate numbers, and takes much longer to factor them. So it runs, while the first core sits idle. Wasted cycles. Similarly for 4 cores.

    What you want to do when the amount of work being handed to a core is unknown, is hand all the cores a lot of modest sized chunks, many more than there are cores. Then cores can finish at uneven rates, and each core goes back to find a bit more work to do. This is essentially a work-list algorithm. You end up with un-evenness at the very end, but it is only on small chunks so not much is wasted.

    I'm not a Python programmer, so I coded a solution in Parlanse instead.

    (includeunique `Console.par')
    (includeunique `Timer.par')
    
    (define upper_limit 10000000)
    
    (define candidates_per_segment 10)
    (define candidates_per_segment2 (constant (* candidates_per_segment 2)))
    
    (= [prime_count natural] 0)
    [prime_finding_team team]
    
    (define primes_in_segment
    (action (procedure [lower natural] [upper natural])
       (;;
          (do [candidate natural] lower upper 2
          (block test_primality
            (local (= [divisor natural] 3)
               (;;
                  (while (< (* divisor divisor) candidate)
                      (ifthenelse (== (modulo candidate divisor) 0)
                         (exitblock test_primality)
                         (+= divisor 2)
                      )ifthenelse
                  )while
                  (ifthen (~= (* divisor divisor) candidate)
                     (consume (atomic+= prime_count))
                  )ifthen
               );;
            )local
          )block
          )do
      );;
      )action
    )define
    
    (define main
    (action (procedure void)
       (local [timer Timer:Timer]
         (;;
         (Console:Put (. `Number of primes found: '))
         (Timer:Reset (. timer))
         (do [i natural] 1 upper_limit candidates_per_segment2
            (consume (draft prime_finding_team primes_in_segment
                         `lower':i
                         `upper':(minimum upper_limit (- (+ i candidates_per_segment2) 2))))
         )do
         (consume (wait (@ (event prime_finding_team))))
         (Timer:Stop (. timer))
         (Console:PutNatural prime_count)
         (Console:PutNewline)
         (Timer:PrintElapsedTime (. timer) (. `Parallel computed in '))
         (Console:PutNewline)
         );;
      )local
    )action
    )define
    

    Parlanse looks like LISP, but works and compiles more like C.

    The worker is primes_in_segment; it takes a range of candidate values defined by its parameters lower and upper. It tries each candidate in that range, and increments (atomically) a total prime_count if that candidate is a prime.

    The full range is split into small packets of ranges (sequences of odd numbers) by the do loop in main. The parallelism happens on the draft command, which creates a parallel execution grain of computation (not a Windows thread) and adds it to the prime_finding_team, which is an aggregate set of work representing all the prime factoring. (The purpose of a team is to allow all this work to managed as a unit, e.g., destroyed if necessary, not needed in this program). The arguments to draft are the function to be run by the forked grain, and its parameters. The work is accomplished by a Parlanse-managed set of (Windows) threads using a work-stealing algorithm. If there is too much work, Parlanse throttles the work-generating grains, and spends its energy executing grains which are pure computation.

    One could pass only one candidate value to each grain, but then there's more fork overhead per candidate and the total runtime gets accordingly worse. We've chosen 10 empirically to ensure that the fork overhead per range of candidates is small; setting the candidates per segment to 1000 doesn't buy much additional speedup.

    The do loop simply manufactures work as fast as it can. Parlanse throttles the draft step when there is enough parallelism to be useful. The wait on the team event, causes the main program to wait for all team members to complete.

    We ran this on an HP hex-core AMD Phenom II X6 1090T 3.2 GHz. Execution runs are below; first for 1 CPU:

     >run -p1 -v ..\teamprimes
    PARLANSE RTS: Version 19.1.53
    # Processors = 1
    Number of primes found: 664579
    Parallel computed in 13.443294 seconds
    ---- PARLANSE RTS: Performance Statistics
      Duration = 13.527557 seconds.
      CPU Time Statistics:
      Kernel CPU Time: 0.031s
      User   CPU Time: 13.432s
      Memory Statistics:
    Peak Memory Usage    : 4.02 MBytes
      Steals: 0  Attempts: 0  Success rate: 0.0%  Work Rediscovered: 0
    Exiting with final status 0.
    

    Then for 6 CPUs (scales nicely):

    >run -p6 -v ..\teamprimes
    PARLANSE RTS: Version 19.1.53
    # Processors = 6
    Number of primes found: 664579
    Parallel computed in 2.443123 seconds
    ---- PARLANSE RTS: Performance Statistics
      Duration = 2.538972 seconds.
      CPU Time Statistics:
    Kernel CPU Time: 0.000s
    User   CPU Time: 14.102s
    Total  CPU Time: 14.102s
      Memory Statistics:
    Peak Memory Usage    : 4.28 MBytes
      Steals: 459817  Attempts: 487334  Success rate: 94.4%  Work Rediscovered: 153
    

    You note the total CPU time for the parallel version is roughly the same as for the serial version; this is because they are doing the same work.

    Given Python's "fork" and "join" operations, I'm sure there's a Python equivalent you can easily code. It might run out of space or threads because of the possibility of too many forks at the same time. (With candidates_per_segment at 10, there are up to 1 million live grains running under Parlanse). This is where automatic throttling the generation of work is a good thing to have. As a substitute, you can set candidates_per_segment to a much larger number, e.g., 10000, which means you only get 1000 threads worst case. (I think you will still pay a high price due the Python's interpretive nature). As you set the candidates per segment closer and closer to 1e7/4, you'll approach the exact behavior you have with your present Python code.