Search code examples
apache-sparkrddpartitioning

Spark RDD Partition effects


I am confused with repartition operation. Please see below code

import org.apache.spark._
import org.apache.log4j._


object FriendsByAge {

  def parseLine(line: String)={
    val fields = line.split(",")
    val age = fields(2).toInt
    val numFriends = fields(3).toInt

    (age, numFriends)
  }

  def main(args: Array[String]) = {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val sc = new SparkContext("local[*]", "FriendsByAge")

    val lines = sc.textFile("./data/fakefriends-noheader.csv").repartition(1000)
    val rdd = lines.map(parseLine)

    println(rdd.getNumPartitions)

    val totalsByAge = rdd.mapValues(x=> (x,1)).reduceByKey((x, y) => (x._1+y._1, x._2 + y._2))

    println(totalsByAge.getNumPartitions)

    val averagesByAges = totalsByAge.mapValues(x => x._1/x._2)

    println(averagesByAges.getNumPartitions)
    val results = averagesByAges.collect()

    results.sortWith(_._2> _._2).foreach(println)
  }


}

Here I am repartitioning the rdd after reading the file into 1000 partitions. Since map operation creates new RDD and partitioning is not preserved. I still see the same number of partitions.

The question is how will I know if child RDD will preserve parent RDD partitions? What is criteria when repartition will be invalidated by child RDD.


Solution

  • mapValues does not alter partitioning already in effect, it's a narrow transformation. You have two of them.

    reduceByKey is associative. Spark aggregates locally and sends those results to driver or to the relevant partitions - in your case. If you do not use the parameter on reduceByKey for number of partitions, you retain the same number of partitions for the new RDD, albeit with different distribution.