I am facing an out of memory error when trying to persist a dataframe and I don't really understand why. I have a dataframe of roughly 20Gb with 2.5 millions rows and around 20 columns. After filtering this dataframe, I have 4 columns and 0.5 million rows.
Now my problem is that when I persist the filtered dataframe I get an out of memory error (exceeds 25.4Gb of 20 Gb physical memory used). I have tried persisting at different storage levels
df = spark.read.parquet(path) # 20 Gb
df_filter = df.select('a', 'b', 'c', 'd').where(df.a == something) # a few Gb
df_filter.persist(StorageLevel.MEMORY_AND_DISK)
df_filter.count()
My cluster has 8 nodes with 30Gb of memory each.
Do you have any idea where that OOM could come from ?
Just some suggestions to help identify root cause ...
You probably have either (or a combo) of ...
# to check num partitions
df_filter.rdd.getNumPartitions()
# to repartition (**does cause shuffle**) to increase parallelism and help with data skew
df_filter.repartition(...) # monitor/debug performance in spark ui after setting
# check via
spark.sparkContext.getConf().getAll()
# these are the ones you want to watch out for
'''
--num-executors
--executor-cores
--executor-memory
'''
# debug directed acyclic graph [dag]
df_filter.explain() # also "babysit" in spark UI to examine performance of each node/partitions to get specs when you are persisting
# check output partitions if shuffle occurs
spark.conf.get("spark.sql.shuffle.partitions")