Search code examples
apache-sparkrddpartitionpersist

Spark: persist and repartition order


I have the following code:

val data = input.map{... }.persist(StorageLevel.MEMORY_ONLY_SER).repartition(2000)

I am wondering what's the difference if I do the repartition first like:

val data = input.map{... }.repartition(2000).persist(StorageLevel.MEMORY_ONLY_SER)

Is there a difference in the order of calling reparation and persist? Thanks!


Solution

  • Yes, there is a difference.

    In the first case you get persist RDD after map phase. It means that every time data is accessed it will trigger repartition.

    In the second case you cache after repartitioning. When data is accessed, and has been previously materialized, there is no additional work to do.

    To prove lets make an experiment:

    import  org.apache.spark.storage.StorageLevel
    
    val data1 = sc.parallelize(1 to 10, 8)
      .map(identity)
      .persist(StorageLevel.MEMORY_ONLY_SER)
      .repartition(2000)
    data1.count()
    
    val data2 = sc.parallelize(1 to 10, 8)
      .map(identity)
      .repartition(2000)
      .persist(StorageLevel.MEMORY_ONLY_SER)
    data2.count()
    

    and take a look at the storage info:

    sc.getRDDStorageInfo
    
    // Array[org.apache.spark.storage.RDDInfo] = Array(
    //   RDD "MapPartitionsRDD" (17) StorageLevel:
    //       StorageLevel(false, true, false, false, 1);
    //     CachedPartitions: 2000; TotalPartitions: 2000; MemorySize: 8.6 KB; 
    //     ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B,
    //   RDD "MapPartitionsRDD" (7) StorageLevel:
    //      StorageLevel(false, true, false, false, 1);
    //    CachedPartitions: 8; TotalPartitions: 8; MemorySize: 668.0 B; 
    //    ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)
    

    As you can see there are two persisted RDDs, one with 2000 partitions, and one with 8.