Search code examples
pythonapache-sparkapache-spark-sqlrdd

Spark RDD and Dataframe transformation optimisation


I am new to Spark and have the following high level question about RDDs and Dataframes which if im not mistaken are built on top of RDDs:

I understand that there are two types of operations that can be done on RDD's, transformations and actions. I also understand that transformations are only executed when an action is performed on an RDD that is a product of that transformation. Given that RDD's are in memory, I was wondering if there was some possibility of optimising the amount of memory consumed by these RDDs, take the following example:

KafkaDF = KafkaDFRaw.select(
        KafkaDFRaw.key,
        KafkaDFRaw.value,
        KafkaDFRaw.topic,
        unix_timestamp('timestamp',
                       'yyyy-MM-dd HH:mm:ss').alias('kafka_arrival_time')
    ).withColumn("spark_arrival_time", udf(time.time, DoubleType())())

I have a KafkaDFRaw dataframe and I produce a new RDD called KafkaDF. I then wish to add columns to this new RDD. Should I add them to the existing RDD? Like so:

decoded_value_udf = udf(lambda value: value.decode("utf-8"))
    KafkaDF = KafkaDF\
        .withColumn(
            "cleanKey", decoded_value_udf(KafkaDF.key))\
        .withColumn(
            "cleanValue", decoded_value_udf(KafkaDF.value))

Or should I create a new dataframe from the last one? Like so:

decoded_value_udf = udf(lambda value: value.decode("utf-8"))
    KafkaDF_NEW = KafkaDF\
        .withColumn(
            "cleanKey", decoded_value_udf(KafkaDF.key))\
        .withColumn(
            "cleanValue", decoded_value_udf(KafkaDF.value))

Does this make a difference in terms of memory optimisation?

Thank you in advance for your help.


Solution

  • Whenever the action is called, the optimized dag gets executed and the memory is used as per the plan. You can compare the execution plans to understand:

    df.explain(true)
    df_new.explain(true)
    

    Creating extra variable in between to hold the transformations does not impact the memory utilization. Memory requirements will depend on data size, partition size, shuffles etc.