Search code examples
apache-sparkpyspark

reusing the same dataframe via cache


I have a DataFrame named dataframe_1, which is referenced in multiple parts of my PySpark code. To optimize performance, I intend to cache dataframe_1 so that subsequent references use the cached data rather than re-reading it multiple times.

Original Code:

dataframe_1 = spark.read.format("delta").table("schema.table_name")
dataframe_1 = dataframe_1.filter(condition).select(list_of_columns_needed)

# Planning to use cache here:dataframe_1.cache() so that cached data can be used by multiple references below.

# usage-1 of dataframe_1
columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
    dataframe_1 = dataframe_1.withColumn(c, 
        when(trim(col(c)) == "", None)
        .otherwise(concat(lit(c), lit("_"), trim(col(c))))
    )

# usage-2 and other references of dataframe_1
...

I need to rename all column values. In the loop where I rename the values of columns mentioned in columns_to_be_renamed, I was necessiated to maintain the DataFrame name as dataframe_1. If I assign values to dataframe_2 as below ( I want some new df so that I can cache old df), only the values of the last column are updated (which is not the expected behavior; ideally, all columns should be updated as in the above code):

columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
    dataframe_2 = dataframe_1.withColumn(c, 
        when(trim(col(c)) == "", None)
        .otherwise(concat(lit(c), lit("_"), trim(col(c))))
    )

Given this, should I cache the DataFrame dataframe_1 immediately after reading it, or should I cache it after the loop? My concern is that if I cache immediately after reading, subsequent references to dataframe_1 after the loop might refer to the uncached version from the loop instead of the cached one.

dataframe_1 = spark.read.format("delta").table("schema.table_name")
dataframe_1 = dataframe_1.filter(condition).select(list_of_columns_needed)

# New changes
dataframe_1.cache()
dataframe_2 = dataframe_1

# usage-1 of dataframe_1
columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
    dataframe_2 = dataframe_2.withColumn(c, 
        when(trim(col(c)) == "", None)
        .otherwise(concat(lit(c), lit("_"), trim(col(c))))
    )

# usage-2 and other references of dataframe_1
...

I have validated the case - when we do dataframe_2 = dataframe_1, later change dataframe_2, dataframe_1 is not updated, retains initial data/state, as expected.

Is this the only way to handle it, or are there better ways to deal with such use cases?


Solution

  • This is a good way to handle it, but test the performance of .cache vs simply re-writing the data for your use case. In a current project I note that .cache is 2-3x slower than simple writing the dataframe to disk and referring to that.

    Additionally don't use withColumn, particularly if you are renaming a large number of columns it will hit your performance heavily, use select directly as you know the fields.

    answers to the below clarifications:

    1. why is cache sometimes slower - easiest way to understand that is that although both write and cache, by default, write out to disk, cache also has to attempt to keep it in memory, cache/persist to disk may also by default go to slower disks ymmv and it's always worth testing
    2. why is withColumn slow - https://www.aizoo.info/post/just-one-more-column-what-could-go-wrong select adds a single projection for all the columns you select, withcolumn adds a projection for each column you add. withColumns is also more expensive than select (it needs to perform an additional action to get the actual column names).