Diving deep into Spark (PySpark) for the first time, and I'm already flooded with under-the-hood curiosity.
From the docs, I understand that when creating an RDD, Spark splits the data evenly across the available partitions. For example...
rdd = spark.sparkContext.parallelize([("A", 1), ("A", 2), ("A", 3), ("B", 1), ("B", 2), ("C", 1), ("C", 2), ("C", 3)])
So my understanding is that if there is 4 partitions then the following is true...
P1: ("A", 1), ("A", 2)
P2: ("A", 3), ("B", 1)
P3: ("B", 2), ("C", 1)
P4: ("C", 2), ("C", 3)
QUESTION 1: I get that each partition will always get 2 elements, but will they always get the SAME two elements? i.e If I run the rdd code above 1 million times, will the elements belong to the same partition every time?
My next question has to do with groupByKey()
. Say we are finding the sum for each key..
sums = rdd.groupByKey().mapValues(sum)
Here is where Spark loses me. I know there are still 4 partitions but not sure how the data is distributed after the shuffle caused by the grouping.
QUESTION 2: How does Spark decide where to send records? And what do the partitions look like after the shuffle?
Here are a few different possible scenarios I can think of after the grouping and sum transformations...
SCENARIO 1 (EVENLY)
P1: ("A", 6)
P2: ("B", 3)
P3: ("C", 6)
P4: EMPTY
SCENARIO 2 (SKEWED)
P1: ("A", 6), ("B", 3)
P2: ("C", 6)
P3: EMPTY
P4: EMPTY
SCENARIO 3 (REALLY SKEWED AND OUT OF ORDER)
P1: EMPTY
P2: EMPTY
P3: ("A", 6), ("B", 3), ("C", 6)
P4: EMPTY
I'm hoping Spark has a process it always follows for distributing data after shuffles. If anyone has any answers I'd love to hear them.
Thanks!
Given an RDD with 4 partitions like this:
val rdd = sc.parallelize(Seq(("A", 1), ("A", 2), ("A", 3), ("B", 1), ("B", 2), ("C", 1), ("C", 2), ("C", 3)), 4)
You might always get the same result irrespective of how many instances you run of the same line because you're using the parallelize
method with a small sample of data. How data gets partitioned when it is read depends on a lot of factors like the underlying file systems, the type of files being read, number of executors, number of driver cores etc.
So to answer your question 1, no the partition structure will not necessarily remain the same everytime you read the data.
For question 2, whenever you used key
based operations, a shuffle is introduced which moves partitions with the same key in the same partition.
For example the following code:
val sumRdd = rdd.groupByKey.mapValues(_.sum)
Would still return an RDD with 4 partitions. The data will be moved based on the default partitioning scheme - Hash Partitioning which decides the partition key based on the logic:
object.hashCode % numPartitions
So all objects with the same hashCode
will move to the same partition and you will have an underlying structure like:
sumRdd.mapPartitionsWithIndex{ (idx, itr) => itr.toList.map( c => c+" -> partition#"+idx ).iterator }.collect
// Array[String] = Array((A,6) -> partition#1, (B,3) -> partition#2, (C,6) -> partition#3)
sumRdd.getNumPartitions
will still return 4
but since one partition is empty, no tasks will be invoked for that partition.