Search code examples
pythonapache-sparkpysparkparquet

Out of memory when trying to persist a dataframe


I am facing an out of memory error when trying to persist a dataframe and I don't really understand why. I have a dataframe of roughly 20Gb with 2.5 millions rows and around 20 columns. After filtering this dataframe, I have 4 columns and 0.5 million rows.

Now my problem is that when I persist the filtered dataframe I get an out of memory error (exceeds 25.4Gb of 20 Gb physical memory used). I have tried persisting at different storage levels

df = spark.read.parquet(path) # 20 Gb
df_filter = df.select('a', 'b', 'c', 'd').where(df.a == something) # a few Gb
df_filter.persist(StorageLevel.MEMORY_AND_DISK) 
df_filter.count()

My cluster has 8 nodes with 30Gb of memory each.

Do you have any idea where that OOM could come from ?


Solution

  • Just some suggestions to help identify root cause ...

    You probably have either (or a combo) of ...

    1. skewed source data partition split sizes which is tough to deal with and cause garbage collection, OOM, etc. (these methods have helped me, but there may be better approaches per use case)
    # to check num partitions
    df_filter.rdd.getNumPartitions()
    
    # to repartition (**does cause shuffle**) to increase parallelism and help with data skew
    df_filter.repartition(...) # monitor/debug performance in spark ui after setting
    
    1. too little/too many executors/ram/cores set in config
    # check via
    spark.sparkContext.getConf().getAll()
    
    # these are the ones you want to watch out for
    '''
    --num-executors
    --executor-cores
    --executor-memory
    '''
    
    1. wide transformation shuffles size too little/too many => try general debug checks to view transformations that will be triggered when persisting + find their # of output partitions to disk
    # debug directed acyclic graph [dag]
    df_filter.explain() # also "babysit" in spark UI to examine performance of each node/partitions to get specs when you are persisting
    
    # check output partitions if shuffle occurs
    spark.conf.get("spark.sql.shuffle.partitions")