I have a nested for loop that performs operations on a data frame 10 times in the inner loop and joins the resulting 10 data frames into a single data frame once it finishes the inner loop.
UPDATE: I use a dictionary to create a list of dataframes to store each operation in and then union them at the end of the inner loop.
It then writes it to a parquet file with the iteration number of the outloop. The outerloop has 6 iterations and therefore should result in 6 parquet files.
It goes something like this:
train=0
for i in range(0,6):
train=train+30
#For loop to aggregate input and create 10 output dataframes
dfnames={}
for j in range(0,10):
ident="_"+str(j)
#Load dataframe of around 1M rows
df=spark.read.parquet("s3://path")
dfnames['df'+ident]= #Perform aggregations and operations
#Combine the 10 datframes into a single df
df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
#Write to output parquet file
df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"
It seems to be working fine until it finishes the 3rd iteration of the outer loop. Then for some reason, it restarts the loop with another attempt id. So I get the first 3 files, but instead of going to the 4th iteration, it restarts to give the first file all over again. I dont get any failed stages or jobs.
I have tried running the for loops alone with dummy variables and print statements (without loading the large data frames etc) and they work fine to completion. I am thinking it has something to do with the way the memory is being flushed after a loop.
These are my EMR Spark running conditions: I am running this on an EMR cluster with 5 executors, 5 driver nodes, and 10 instances with a total of 50 cores. The spark executor and driver memory is 45G each with a total of about 583G. The typical shuffle read is 250G and shuffle write is 331G.
Some of the pertinent Spark environment variables are shown below:
Is there something I am doing wrong with regards to the loop or memory management? Any insight would be greatly appreciated!
Try to not combine Python data structures with Spark data structures.
You want to convert the for loops into a map-reduce, foreach form of design.
Along with this, you can create a cache/ spark checkpoint in each iteration to avoid rerunning the entire DAG from scratch.
To cache your data:
df.cache()
for checkpointing
spark.sparkContext.setCheckpointDir('<some path>')
df.checkpoint()
These will show performance and scale improvement once you use the spark constructs instead of python constructs. For example, replace your for loop by foreach, replace union of a list by map reduce.