Search code examples
apache-sparkpysparkamazon-emrgraphframes

Spark Graphframes large dataset and memory Issues


I want to run a pagerank on relativly large graph 3.5 billion nodes 90 billion edges. And I have been experimenting with different cluster sizes to get it to run. But first the code:

from pyspark.sql import SparkSession
import graphframes

spark = SparkSession.builder.getOrCreate()

edges_DF = spark.read.parquet('s3://path/to/edges') # 1.4TB total size
verts_DF   = spark.read.parquet('s3://path/to/verts') # 25GB total size

graph_GDF = graphframes.GraphFrame(verts_DF, edges_DF)
graph_GDF = graph_GDF.dropIsolatedVertices()

result_df   = graph_GDF.pageRank(resetProbability=0.15, tol=0.1)
pagerank_df = result_df.vertices
pagerank_df.write.parquet('s3://path/to/output', mode='overwrite')

I experienced high garbage collection problems times right from the start. So I experimented with different settings and sizes for the cluster. I mainly followed two articles:

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

I run the cluster on amazon EMR. These are the relevant setting I currently use:

"spark.jars.packages": "org.apache.hadoop:hadoop-aws:2.7.6,graphframes:graphframes:0.7.0-spark2.4-s_2.11",
"spark.dynamicAllocation.enabled": "false",
"spark.network.timeout":"1600s",
"spark.executor.heartbeatInterval":"120s",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.shuffle.partitions":"1216"

"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"

"maximizeResourceAllocation": "true"

"fs.s3.maxConnections": "5000",
"fs.s3.consistent": "true",
"fs.s3.consistent.throwExceptionOnInconsistency":"false",
"fs.s3.consistent.retryPolicyType":"fixed",
"fs.s3.consistent.retryPeriodSeconds":"10"

I experimented with cluster sizes my first experiment that seemed to work was a cluster with the following parameters: --deploy-mode cluster --num-executors 75 --executor-cores 5 --executor-memory 36g --driver-memory 36g --driver-cores 5

With this configuration GC time was way down everything was working but since it was a test the cluster it had very "little" memory with 2.7 TB in total, also after a while I got ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Container from a bad node Exit status: 137. Which I thought happened because I left the node to little RAM. So I rerun the whole thing but this time with --executor-cores 5 --executor-memory 35g and right away my GC problems where back and my cluster acted really weird. So I thought I understood the problem that the reason for the high GC times was not insufficient memory per executor.

Next cluster I spun up was with the following parameters: --deploy-mode cluster --num-executors 179 --executor-cores 5 --executor-memory 45g --driver-memory 45g --driver-cores 5

So a larger cluster and even more memory per executor as before. everything was running smoothly and I noticed via ganglia that the first step took about 5.5 TB of ram.

I though I understood the issues that using less cores available to my cluster and enlarging the memory of each executor makes the program faster I guessed that it hast to do with the verts_DF being about 25gb in size and this way it would fit into the memory of each executor and leave room the calculations (25GB * 179 nearly is 5.5TB).

So the next cluster I spun up had the same number of nodes but I resized the exectuors to: --num-executors 119 --executor-cores 5 --executor-memory 75g

Instantly all the problems where back! High GC times the cluster was hanging via ganglia I could see the RAM filling up to 8 of 9 available TB. I was baffled. I went back and spun up the --num-executors 179 --executor-cores 5 --executor-memory 45g cluster again, which luckily is easy to do with EMR because I could just clone it. But now also this configuration did not work. High GC times Cluster hitting 8TB of used memory right away.

What is going on here? It feels like I play roulette sometimes the same config works and other times it does not?


Solution

  • If someone still stumbles upon this after some time passed it realized that the problem lies with how graphx or graphframes load the graph. Both try to generate all triplets of the graph they are loading, which with very large graphs resoluts in OOM errors, because a graph with 3.5 billion nodes and 70 billion edges has damn many of them. I wrote a solution by implementing pagerank in pyspark. It is for sure not as fast as a scala implementation but it scales and does not run into the described triplet problem. I published it on github https://github.com/thagorx/spark_pagerank