I am using Spark/GraphFrames from Python and from R. When I call PageRank on a small graph from Python, it is a lot slower than with R. Why is it so much slower with Python, considering that both Python and R are calling the same libraries?
I'll try to demonstrate the problem below.
Spark/GraphFrames includes examples of graphs, such as friends, as described on this link. This is a very small directed graph with 6 nodes and 8 edges (note that the example is not the same compared to other versions of GraphFrames).
When I run the following piece of code with R, it takes almost not time to calculate PageRank:
nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')
sc <- spark_connect(master = "local", version = "2.1.1")
nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)
graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)
When I run the equivalent with PySpark, it takes 10 to 30 minutes:
from pyspark.sql import SparkSession
from graphframes.examples import Graphs
if __name__ == '__main__':
sc = SparkSession.builder.master("local").getOrCreate()
g = Graphs(sc).friends()
results = g.pageRank(resetProbability=0.15, tol=0.01)
results.vertices.select("id", "pagerank").show()
results.edges.select("src", "dst", "weight").show()
I tried different version of Spark and GraphFrames for Python to be aligned with the settings of R.
In, general when you see such significant runtime differences between pieces of code that are apparently equivalent in different backends you have to consider two possibilities:
In this particular case the first and the most obvious reason is how you load the data.
In sparklyr
uses by default only a single partition. With such small data it can be often beneficial, as parallelization / distribution overhead can be much higher than the computation cost, but can also lead to miserable failures.
In PySpark, friends
loader uses standard parallelize
- it means that the number of partitions will use defaultParallelism
Based on the master configuration the value is at least 1, but it can be affected by configuration options not visible here (like spark.default.parallelism
However, as far as I can tell tell, these options shouldn't affect the runtime in this particular case. Moreover the path before code reaches JVM backend in both cases, doesn't seem to differ enough to explain the difference.
This suggests that problem lies somewhere in the configuration. In general there are at least two options which can significantly affect data distribution, and therefore the execution time:
- used with RDD API to determine the number of partitions in different cases, including default post-shuffle distribution. For possible implications see for example Spark iteration time increasing exponentially when using join
It doesn't look like it affects your code here.
- used with Dataset
API to determine the number of partitions after a shuffle (groupBy
, join
, etc.).
While PageRank code uses old GraphX API, and this parameter is not directly applicable there, before data is passed to the older API, involves indexing edges and vertices with Dataset
If you check the source you'll see that both indexedEdges
and indexVertices
use joins, and therefore depend on spark.sql.shuffle.partitions
Furthermore the number of partitions set by aforementioned methods will be inherited by the GraphX Graph
object, significantly affecting execution time.
If you set spark.sql.shuffle.partitions
to a minimum value:
spark: SparkSession
spark.conf.set("spark.sql.shuffle.partitions", 1)
the execution time on such small data should be negligible.
You environments are likely to use different values of spark.sql.shuffle.partitions
General Directions:
If you see behavior like this, and want to roughly narrow down the problem you should take a look at the Spark UI, and see where things diverge. In this case you're likely to see significantly different numbers of tasks.