I am new to Spark and have the following high level question about RDDs and Dataframes which if im not mistaken are built on top of RDDs:
I understand that there are two types of operations that can be done on RDD's, transformations and actions. I also understand that transformations are only executed when an action is performed on an RDD that is a product of that transformation. Given that RDD's are in memory, I was wondering if there was some possibility of optimising the amount of memory consumed by these RDDs, take the following example:
KafkaDF = KafkaDFRaw.select(
KafkaDFRaw.key,
KafkaDFRaw.value,
KafkaDFRaw.topic,
unix_timestamp('timestamp',
'yyyy-MM-dd HH:mm:ss').alias('kafka_arrival_time')
).withColumn("spark_arrival_time", udf(time.time, DoubleType())())
I have a KafkaDFRaw dataframe and I produce a new RDD called KafkaDF. I then wish to add columns to this new RDD. Should I add them to the existing RDD? Like so:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
Or should I create a new dataframe from the last one? Like so:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF_NEW = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
Does this make a difference in terms of memory optimisation?
Thank you in advance for your help.
Whenever the action is called, the optimized dag gets executed and the memory is used as per the plan. You can compare the execution plans to understand:
df.explain(true)
df_new.explain(true)
Creating extra variable in between to hold the transformations does not impact the memory utilization. Memory requirements will depend on data size, partition size, shuffles etc.