Search code examples
pythonscalabigdataapache-sparkmulticore

Why is this simple Spark program not utlizing multiple cores?


So, I'm running this simple program on a 16 core multicore system. I run it by issuing the following.

spark-submit --master local[*] pi.py

And the code of that program is the following.

#"""pi.py"""
from pyspark import SparkContext
import random

N = 12500000

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

When I use top to see CPU consumption, only 1 core is being utilized. Why is it so? Seconldy, spark documentation says that the default parallelism is contained in property spark.default.parallelism. How can I read this property from within my python program?


Solution

  • Probably because the call to sc.parallelize puts all the data into one single partition. You can specify the number of partitions as 2nd argument to parallelize:

    part = 16
    count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)
    

    Note that this would still generate the 12 millions points with one CPU in the driver and then only spread them out to 16 partitions to perform the reduce step.

    A better approach would try to do most of the work after the partitioning: for example the following generates only a tiny array on the driver and then lets each remote task generate the actual random numbers and subsequent PI approximation:

    part = 16
    count = ( sc.parallelize([0] * part, part)
               .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
               .reduce(lambda a, b: a + b)
           )
    

    Finally, (because the more lazy we are the better), spark mllib actually comes already with a random data generation which is nicely parallelized, have a look here: http://spark.apache.org/docs/1.1.0/mllib-statistics.html#random-data-generation. So maybe the following is close to what you try to do (not tested => probably not working, but should hopefully be close)

    count = ( RandomRDDs.uniformRDD(sc, N, part)
            .zip(RandomRDDs.uniformRDD(sc, N, part))
            .filter (lambda (x, y): x*x + y*y < 1)
            .count()
            )