Search code examples
csvapache-spark-sqlrdd

Difference between loading a csv file into RDD and Dataframe in spark


I am not sure if this specific question is asked earlier or not. could be a possible duplicate but I was not able to find a use case persisting to this.

As we know that we can load a csv file directly to dataframe and can load it into RDD also and then convert that RDD to dataframe later.

RDD = sc.textFile("pathlocation")

we can apply some Map, filter and other operations on this RDD and can convert it into dataframe.

Also we can create a dataframe directly reading a csv file

Dataframe = spark.read.format("csv").schema(schema).option("header","false").load("pathlocation")

My question is that what could be the use cases when we have to load a file using RDD first and convert it into dataframe?

I just know that textFile reads data line by line. What could be the scenarios when we have to choose RDD method over dataframe?


Solution

  • DataFrames / Datasets offer huge performance improvement over RDDs because of 2 powerful features:

    1. Custom Memory management (aka Project Tungsten) Data is stored in off-heap memory in binary format. This saves a lot of memory space. Also there is no Garbage Collection overhead involved. By knowing the schema of data in advance and storing efficiently in binary format, expensive java Serialization is also avoided.

    2. Optimized Execution Plans (aka Catalyst Optimizer)
      Query plans are created for execution using Spark catalyst optimiser. After an optimised execution plan is prepared going through some steps, the final execution happens internally on RDDs only but thats completely hidden from the users.

    In general, you should never use RDD's unless you want to handle the low-level optimizations / serializations yourself.

    Customer Partitioner implementation in PySpark, with RDD's:

    def partitionFunc(key):
    import random
    if key == 17850 or key == 12583:
    return 0
    else:
    return random.randint(1,2)
    
    # You can call the Partitioner as below:
    keyedRDD = rdd.keyBy(lambda row: row[6])
    keyedRDD\
    .partitionBy(3, partitionFunc)\
    .map(lambda x: x[0])\
    .glom()\
    .map(lambda x: len(set(x)))\
    .take(5)