Search code examples
performanceapache-sparkdataframeapache-spark-sqlrdd

Why would one use DataFrame.select over DataFrame.rdd.map (or vice versa)?


Is there any "mechanical" difference between using select on a DataFrame to pick up information we need and mapping each row of the underlying RDD for the same purpose?

By "mechanical" I am referring to the the mechanism that performs the operations. Implementation details, in other words.

Which of offered two is better/more performant?

df = # create dataframe ...
df.select("col1", "col2", ...)

or

df = # create dataframe ...
df.rdd.map(lambda row: (row[0], row[1], ...))

I am in the middle of performance testing, so I am going to find out which is faster but I would like to know what are implementation differences and pros/cons.


Solution

  • In this oversimplified example with DataFrame.select and DataFrame.rdd.map I think the difference might be almost negligible.

    After all you've loaded your data set already and only do projection. Eventually both would have to deserialize the data from Spark's InternalRow columnar format to calculate the result for an action.

    You can check what happens with DataFrame.select by explain(extended = true) where you will learn about the physical plans (and a physical plan, too).

    scala> spark.version
    res4: String = 2.1.0-SNAPSHOT
    
    scala> spark.range(5).select('id).explain(extended = true)
    == Parsed Logical Plan ==
    'Project [unresolvedalias('id, None)]
    +- Range (0, 5, step=1, splits=Some(4))
    
    == Analyzed Logical Plan ==
    id: bigint
    Project [id#17L]
    +- Range (0, 5, step=1, splits=Some(4))
    
    == Optimized Logical Plan ==
    Range (0, 5, step=1, splits=Some(4))
    
    == Physical Plan ==
    *Range (0, 5, step=1, splits=Some(4))
    

    Compare the physical plan (i.e. SparkPlan) to what you're doing with rdd.map (by toDebugString) and you'll know what might be "better".

    scala> spark.range(5).rdd.toDebugString
    res5: String =
    (4) MapPartitionsRDD[8] at rdd at <console>:24 []
     |  MapPartitionsRDD[7] at rdd at <console>:24 []
     |  MapPartitionsRDD[6] at rdd at <console>:24 []
     |  MapPartitionsRDD[5] at rdd at <console>:24 []
     |  ParallelCollectionRDD[4] at rdd at <console>:24 []
    

    (again in this contrived example I think that there is no winner -- both are as efficient as possible).

    Please note that DataFrame is really a Dataset[Row] which uses RowEncoder to encode (i.e. serialize) the data into an InternalRow columnar binary format. If you were to execute more operators in a pipeline, you could get much better performance with sticking to Dataset than RDD just because the low-level behind-the-scenes logical query plan optimizations and the columnar binary format.

    There are a lot of optimizations and trying to beat them might often result in a waste of your time. You'd have to know the Spark internals by heart to get better performance (and the price would certainly be readability).

    There's much to it and I'd strongly recommend watching the talk A Deep Dive into the Catalyst Optimizer by Herman van Hovell to know and appreciate all the optimizations.

    My take on it is..."Stay away from RDDs unless you know what you're doing".