Search code examples
pythonapache-sparkpysparkdistributed

How to force repartitioning in a spark dataframe?


I have many spark dataframes on which I need to do the following:

1) load a single spark dataframe
2) select rows from it
3) merge it with all of the previous spark dataframes

Now, each of the above operations requires a different numberof partitions. The selecting rows requires many partitions, like 100 partitions. The merging requires very few partitions, like 10 partitions.

So, I really want it to work like this:

1) load a single spark dataframe
1.5) repartition into 100 partitions
2) select rows from it
2.5) repartition into 10 partitions
3) merge it with all of the previous spark dataframes

Now, how do I force this to repartition in between steps 1 and 2 and in between 2 and 3?

I know that when I call data = data.repartition(7) it is lazily evaluated, and so it only repartitions when it is actually saving.

So, I have been doing it like this:

1) load a single spark dataframe
1.5) repartition into 100 partitions
1.75) `df.count()` *just* to force materialization
2) select rows from it
2.5) repartition into 10 partitions
2.75) `df.count()` *just* to force materialization
3) merge it with all of the previous spark dataframes

Is there a better way to force it to repartition in between here? Is there a better way than running count() on the dataframe?


Solution

  • As all transformations of dataframes in spark are lazily evaluated you need to perform an action to actually perform the transformations. Currently there are no other way to force the transformations.

    All available dataframe actions can be found in the documentation (look under actions). In your case, instead of using count() to force the transformation you could use first() which should be significantly cheaper.

    In step 2.5 you could replace the repartition() with coalesce() as it will avoid a full shuffle. This is often advantageous when the new number of partitions is less than before, since it will minimize the data movement.

    EDIT:

    To answer your question about what happens if you do not use any action and simply do: 1) repartition, 2) spark dataframe transform, 3) repartition. Due to optimizations spark performs on the transformations it seems to not always be the case that this order is followed. I made a small test program to test it out:

    val df = spark.sparkContext.parallelize(Array((1.0,"a"),(2.0,"b"),(3.0,"c"),(1.0,"d"),(2.0,"e"),(3.0,"f"))).toDF("x", "y")
    val df1 = df.repartition(10).filter($"x" =!= 1.0).repartition(5).filter($"y" =!= "b")
    df1.explain(true)
    

    This returns information about how the dataframe is computed.

    == Parsed Logical Plan ==
    'Filter NOT ('y = b)
    +- Repartition 5, true
       +- Filter NOT (x#5 = 1.0)
          +- Repartition 10, true
             +- Project [_1#2 AS x#5, _2#3 AS y#6]
                +- LogicalRDD [_1#2, _2#3]
    
    == Analyzed Logical Plan ==
    x: double, y: string
    Filter NOT (y#6 = b)
    +- Repartition 5, true
       +- Filter NOT (x#5 = 1.0)
          +- Repartition 10, true
             +- Project [_1#2 AS x#5, _2#3 AS y#6]
                +- LogicalRDD [_1#2, _2#3]
    
    == Optimized Logical Plan ==
    Repartition 5, true
    +- Project [_1#2 AS x#5, _2#3 AS y#6]
       +- Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
          +- LogicalRDD [_1#2, _2#3]
    
    == Physical Plan ==
    Exchange RoundRobinPartitioning(5)
    +- *Project [_1#2 AS x#5, _2#3 AS y#6]
       +- *Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
          +- Scan ExistingRDD[_1#2,_2#3]
    

    As can be seen here, the repartition(10) step is not included and seems to have been removed during the optimization.