Search code examples
apache-sparkcachingpysparkapache-spark-sql

Reusing pyspark cache and unpersist in for loop


I have a lot of data that I'm trying to take out in chunks - let's say 3 chunks - and not have it all cached in memory at once. However, I'd like to save it (action) all at the same time afterwards.

This is the current simplified strategy:

for query in [query1,query2,query3]:

    df = spark.sql(query)

    df.cache()

    df1 = df.filter('a')
    df2 = df.filter('b')

    final_output_1 = final_output_1.join(df1)
    final_output_2 = final_output_2.join(df2)

    df.unpersist()


final_output_1.write.saveAsTable()
final_output_2.write.saveAsTable()

So first question: would unpersist() not work here since there hasn't been an action yet on df?

second question: how does df.cache() work here when I'm reusing the df variable in the for loop? I know it's immutable so it would make a copy but would the unpersist() actually clear that memory?


Solution

  • Caching is used in Spark when you want to re use a dataframe again and again ,

    for ex: mapping tables

    once you cache teh df you need an action operation to physicaly move data to memory as spark is based on lazy execution.

    In your case

    df.cache()
    

    will not work as expected as you are not performing an action after this.

    For cache to work you need to run df.count() or df.show() or any other action for the data to be moved to memory , otherwise your data wont be moved to memory and you will not get any advantage. and so the df.unpersist() is also redundant.

    First Question:

    No your df.cache() and df.unpersist() will not work as no data was cached to begin with so their is nothing to unpersist.

    Second Question:

    Yes you can use the same variable name and if an action is performed data will get cached and after your operations df.unpersist() will unpersist the data in each loop. So the previous DF has no connection to the next DF in next loop. As you said they are immutable , and since you are assigning new query to the same variable in each loop it acts as a new DF (not related to previous DF).

    Based on your code i dont think u need to do caching as you are only performing one operation.

    refer to When to cache a DataFrame? and If I cache a Spark Dataframe and then overwrite the reference, will the original data frame still be cached?