Search code examples
dataframeapache-sparkapache-spark-sqlexecutionsql-execution-plan

How does a spark dataframe execution work if one dataframe is dependent on another?


I am looking to find details of how a spark dataframe is executed if there is dependency on other dataframes.

E.g. Lets say, I have 3 dataframes. DF1, DF2 and DF3.

DF1 - Reads from a table A

DF2 - Reads from DF1.

DF3 - Joins DF1 and DF2.

When I do DF3.show(), would it also execute DF1 and DF2 in backend?


Solution

  • Small question with many aspects.

    • Spark uses lazy evaluation.

    • The show() will trigger an Action. A resultStage is what is created.

    • The built up plan, based on the Action, will have fused code (maps, filters aka narrow transformations) together / optimized if possible within a Stage, so you may not really get a DF2 physically, but you may. It all depends on shuffling over Stages (in general as a result of wide transformations). I.e. do you have wide transformations and special things that Spark initiates in the background, e.g. for a pivot.

    • In your case:

      • DF1 yes as it is from rest.
      • DF2 as well as it part of JOIN with shuffle, but if it is a consequence of a simple map and filter, then may be not.
      • And DF3 as well as it is JOIN which involves a shuffle.

    As it is a lengthy discussion, here is a good link: https://mapr.com/blog/how-spark-runs-your-applications/

    I leave caching out, but try the below code and look at the SQL tab as well as Stages tab. Also, try without JOIN and see what happens.

    val df1 = spark.read.parquet("simple.parquet")//.cache()
    val df2 = df1.filter(df1("country") === "Holland")
    val df3 = df1.join(df2, Seq("id"))
    df3.show(false)