Search code examples
apache-sparkpysparkmemory-leaksnested-loopsamazon-emr

For Loop keeps restarting in EMR (pyspark)


I have a nested for loop that performs operations on a data frame 10 times in the inner loop and joins the resulting 10 data frames into a single data frame once it finishes the inner loop.

UPDATE: I use a dictionary to create a list of dataframes to store each operation in and then union them at the end of the inner loop.

It then writes it to a parquet file with the iteration number of the outloop. The outerloop has 6 iterations and therefore should result in 6 parquet files.

It goes something like this:

train=0
for i in range(0,6):
    train=train+30
    #For loop to aggregate input and create 10 output dataframes
    dfnames={}
    for j in range(0,10):
        ident="_"+str(j)  
        #Load dataframe of around 1M rows
        df=spark.read.parquet("s3://path")
        dfnames['df'+ident]= #Perform aggregations and operations
    #Combine the 10 datframes into a single df
    df_out=df_1.uniionByName(d_2).unionByName(df_3)...unionByName(df_10)
    #Write to output parquet file
    df_out.write.mode('overwrite').parquet("s3://path/" + str(train) +".parquet"

It seems to be working fine until it finishes the 3rd iteration of the outer loop. Then for some reason, it restarts the loop with another attempt id. So I get the first 3 files, but instead of going to the 4th iteration, it restarts to give the first file all over again. I dont get any failed stages or jobs.

I have tried running the for loops alone with dummy variables and print statements (without loading the large data frames etc) and they work fine to completion. I am thinking it has something to do with the way the memory is being flushed after a loop.

These are my EMR Spark running conditions: I am running this on an EMR cluster with 5 executors, 5 driver nodes, and 10 instances with a total of 50 cores. The spark executor and driver memory is 45G each with a total of about 583G. The typical shuffle read is 250G and shuffle write is 331G.

Some of the pertinent Spark environment variables are shown below:

enter image description here

Is there something I am doing wrong with regards to the loop or memory management? Any insight would be greatly appreciated!


Solution

  • Try to not combine Python data structures with Spark data structures.

    You want to convert the for loops into a map-reduce, foreach form of design.

    Along with this, you can create a cache/ spark checkpoint in each iteration to avoid rerunning the entire DAG from scratch.

    To cache your data:

    df.cache()
    

    for checkpointing

    spark.sparkContext.setCheckpointDir('<some path>')
    df.checkpoint()
    

    These will show performance and scale improvement once you use the spark constructs instead of python constructs. For example, replace your for loop by foreach, replace union of a list by map reduce.