Search code examples
apache-sparkpysparklazy-evaluation

Save Spark Dataframe Before Writing to Snowflake


I am working in PySpark and I do a bunch of transformations and apply user defined functions before getting a final output table that I am writing to Snowflake. The final command to write to Snowflake takes ~25 minutes to run because it is also performing all the calculations since Spark evaluates lazily and isn't evaluating until that final call. I want to have the final table evaluated in the step before so I can time how long all the transforms take and then separately time how long the write to Snowflake step takes. How do I separate the two? I have tried to do:

temp = final_df.show() 

temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()

But I get error:

'NoneType' object has no attribute 'write'

And with collect()

temp = final_df.collect() 

temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()

But I get error:

'list' object has no attribute 'write'

Solution

  • Your temp dataframe is having result of .show() that results none type for temp variable and only dataframe has .write method to sources.

    Try with below code:

    temp = final_df
    #view records from temp dataframe
    temp.show()
    
    temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
    .option("dbtable","TEST_SPARK").save()
    

    #collect collects the data as list and stores into temp variable
    temp = final_df.collect() 
    
    #list attributes doesn't have .write method
    final_df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
    .option("dbtable","TEST_SPARK").save()
    

    Update:

    import time
    start_time = time.time()
    #code until show()
    temp = final_df
    #view records from temp dataframe
    temp.show()
    end_time = time.time()
    print("Total execution time for action: {} seconds".format(end_time - start_time))
    
    start_time_sfw = time.time()
    #code until show()
    final_df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
    .option("dbtable","TEST_SPARK").save()
    end_time_sfw = time.time()
    print("Total execution time for writing to snowflake: {} seconds".format(end_time_sfw - start_time_sfw))