Search code examples
apache-sparkapache-spark-sqlapache-spark-2.0

Caching in spark before diverging the flow


I have a basic question regarding working with Spark DataFrame.

Consider the following piece of pseudo code:

val df1 = // Lazy Read from csv and create dataframe
val df2 = // Filter df1 on some condition
val df3 = // Group by on df2 on certain columns
val df4 = // Join df3 with some other df

val subdf1 = // All records from df4 where id < 0
val subdf2 =  // All records from df4 where id > 0

* Then some more operations on subdf1 and subdf2 which won't trigger spark evaluation yet*

// Write out subdf1
// Write out subdf2

Suppose I start of with main dataframe df1(which I lazy read from the CSV), do some operations on this dataframe (filter, groupby, join) and then comes a point where I split this datframe based on a condition (for eg, id > 0 and id < 0). Then I further proceed to operate on these sub dataframes(let us name these subdf1, subdf2) and ultimately write out both the sub dataframes.

Notice that the write function is the only command that triggers the spark evaluation and rest of the functions(filter, groupby, join) result in lazy evaluations.

Now when I write out subdf1, I am clear that lazy evaluation kicks in and all the statements are evaluated starting from reading of CSV to create df1.

My question comes when we start writing out subdf2. Does spark understand the divergence in code at df4 and store this dataframe when command for writing out subdf1 was encountered? Or will it again start from the first line of creating df1 and re-evaluate all the intermediary dataframes? If so, is it a good idea to cache the dataframe df4(Assuming I have sufficient memory)?

I'm using scala spark if that matters. Any help would be appreciated.


Solution

  • No, Spark cannot infer that from your code. It will start all over again. To confirm this, you can do subdf1.explain() and subdf2.explain() and you should see that both dataframes have query plans that start right from the beginning where df1 was read.

    So you're right that you should cache df4 to avoid redoing all the computations starting from df1, if you have enough memory. And of course, remember to unpersist by doing df4.unpersist() at the end if you no longer need df4 for any further computations.