Hi I have created Graph using vertex and edge files. Size of graph is 600GB. I am querying this graph using motif feature of Spark GraphFrames. I have setup an AWS EMR cluster for querying graph.
cluster details:- 1 master and 8 slaves
Master Node:
m5.xlarge
4 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB
Slave Node:
m5.4xlarge
16 vCore, 64 GiB memory, EBS only storage
EBS Storage:256 GiB (per instance)
I am facing very high shuffle read(3.4TB) and write(2TB), this is affecting performance and it takes around 50 mins to execute only 10 queries.Is there any way to reduce such high shuffle.
Following is my spark code:-
val spark = SparkSession.builder.appName("SparkGraph POC").getOrCreate()
val g:GraphFrame = GraphFrame(vertexDf, edgeDf)
//queries
val q1 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c)")
q1.filter(
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows'").distinct()
.createOrReplaceTempView("q1table")
spark.sql("select a.id as a_id,a.name as a_name," +
"b.id as b_id,b.name as b_name," +
"c.id as c_id,c.name as c_name from q1table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q1")
spark.catalog.uncacheTable("q1table")
val q2 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c); (c)-[r3]->(d); (d)-[r4]->(e)")
q2.filter(
" a.name = 'user1' and" +
" e.name = 'user4' and" +
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows' and" +
" r3.relationship = 'knows' and" +
" r4.relationship = 'knows'").distinct()
.createOrReplaceTempView("q2table")
spark.sql("select a.id as a_id, a.name as a_name ," +
"e.id as e_id, e.name as e_name from q2table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q2")
spark.catalog.uncacheTable("q2table")
spark.stop()
The problem with the implementation of Graphframes is that it makes self joins of the internal dataframes as many times as you use on the motifs. That means that you will have more a more shuffle as the length of the chain increases
You can see more details at https://www.waitingforcode.com/apache-spark-graphframes/motifs-finding-graphframes/read
I have also tried a similar approach and have seen that when the length of the chain is greater than 12 Spark starts being not responsive and connections are lost with executors, even if I increased resources.
If you are trying to do that, I would recommend using a graph database instead.
Hope this helps