So, I'm running this simple program on a 16 core multicore system. I run it by issuing the following.
spark-submit --master local[*] pi.py
And the code of that program is the following.
#"""pi.py"""
from pyspark import SparkContext
import random
N = 12500000
def sample(p):
x, y = random.random(), random.random()
return 1 if x*x + y*y < 1 else 0
sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
When I use top to see CPU consumption, only 1 core is being utilized. Why is it so? Seconldy, spark documentation says that the default parallelism is contained in property spark.default.parallelism. How can I read this property from within my python program?
Probably because the call to sc.parallelize puts all the data into one single partition. You can specify the number of partitions as 2nd argument to parallelize:
part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)
Note that this would still generate the 12 millions points with one CPU in the driver and then only spread them out to 16 partitions to perform the reduce step.
A better approach would try to do most of the work after the partitioning: for example the following generates only a tiny array on the driver and then lets each remote task generate the actual random numbers and subsequent PI approximation:
part = 16
count = ( sc.parallelize([0] * part, part)
.flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
.reduce(lambda a, b: a + b)
)
Finally, (because the more lazy we are the better), spark mllib actually comes already with a random data generation which is nicely parallelized, have a look here: http://spark.apache.org/docs/1.1.0/mllib-statistics.html#random-data-generation. So maybe the following is close to what you try to do (not tested => probably not working, but should hopefully be close)
count = ( RandomRDDs.uniformRDD(sc, N, part)
.zip(RandomRDDs.uniformRDD(sc, N, part))
.filter (lambda (x, y): x*x + y*y < 1)
.count()
)