I have a DataFrame named dataframe_1, which is referenced in multiple parts of my PySpark code. To optimize performance, I intend to cache dataframe_1 so that subsequent references use the cached data rather than re-reading it multiple times.
Original Code:
dataframe_1 = spark.read.format("delta").table("schema.table_name")
dataframe_1 = dataframe_1.filter(condition).select(list_of_columns_needed)
# Planning to use cache here:dataframe_1.cache() so that cached data can be used by multiple references below.
# usage-1 of dataframe_1
columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
dataframe_1 = dataframe_1.withColumn(c,
when(trim(col(c)) == "", None)
.otherwise(concat(lit(c), lit("_"), trim(col(c))))
)
# usage-2 and other references of dataframe_1
...
I need to rename all column values. In the loop where I rename the values of columns mentioned in columns_to_be_renamed, I was necessiated to maintain the DataFrame name as dataframe_1. If I assign values to dataframe_2 as below ( I want some new df so that I can cache old df), only the values of the last column are updated (which is not the expected behavior; ideally, all columns should be updated as in the above code):
columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
dataframe_2 = dataframe_1.withColumn(c,
when(trim(col(c)) == "", None)
.otherwise(concat(lit(c), lit("_"), trim(col(c))))
)
Given this, should I cache the DataFrame dataframe_1 immediately after reading it, or should I cache it after the loop? My concern is that if I cache immediately after reading, subsequent references to dataframe_1 after the loop might refer to the uncached version from the loop instead of the cached one.
dataframe_1 = spark.read.format("delta").table("schema.table_name")
dataframe_1 = dataframe_1.filter(condition).select(list_of_columns_needed)
# New changes
dataframe_1.cache()
dataframe_2 = dataframe_1
# usage-1 of dataframe_1
columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
dataframe_2 = dataframe_2.withColumn(c,
when(trim(col(c)) == "", None)
.otherwise(concat(lit(c), lit("_"), trim(col(c))))
)
# usage-2 and other references of dataframe_1
...
I have validated the case - when we do dataframe_2 = dataframe_1, later change dataframe_2, dataframe_1 is not updated, retains initial data/state, as expected.
Is this the only way to handle it, or are there better ways to deal with such use cases?
This is a good way to handle it, but test the performance of .cache vs simply re-writing the data for your use case. In a current project I note that .cache is 2-3x slower than simple writing the dataframe to disk and referring to that.
Additionally don't use withColumn, particularly if you are renaming a large number of columns it will hit your performance heavily, use select directly as you know the fields.
answers to the below clarifications: