Search code examples
apache-sparkspark-graphx

How does the filter operation of Spark work on GraphX edges?


I'm very new to Spark and don't really know the basics, I just jumped into it to solve a problem. The solution for the problem involves making a graph (using GraphX) where edges have a string attribute. A user may wish to query this graph and I handle the queries by filtering out only those edges that have the string attribute which is equal to the user's query.

Now, my graph has more than 16 million edges; it takes more than 10 minutes to create the graph when I'm using all 8 cores of my computer. However, when I query this graph (like I mentioned above), I get the results instantaneously (to my pleasant surprise).

So, my question is, how exactly does the filter operation search for my queried edges? Does it look at them iteratively? Are the edges being searched for on multiple cores and it just seems very fast? Or is there some sort of hashing involved?

Here is an example of how I'm using filter: Mygraph.edges.filter(_.attr(0).equals("cat")) which means that I want to retrieve edges that have the attribute "cat" in them. How are the edges being searched?


Solution

  • How can the filter results be instantaneous?

    Running your statement returns so fast because it doesn't actually perform the filtering. Spark uses lazy evaluation: it doesn't actually perform transformations until you perform an action which actually gathers the results. Calling a transformation method, like filter just creates a new RDD that represents this transformation and its result. You will have to perform an action like collect or count to actually have it executed:

    def myGraph: Graph = ???
    
    // No filtering actually happens yet here, the results aren't needed yet so Spark is lazy and doesn't do anything
    val filteredEdges = myGraph.edges.filter()
    
    // Counting how many edges are left requires the results to actually be instantiated, so this fires off the actual filtering
    println(filteredEdges.count)
    
    // Actually gathering all results also requires the filtering to be done
    val collectedFilteredEdges = filteredEdges.collect
    

    Note that in these examples the filter results are not stored in between: due to the laziness the filtering is repeated for both actions. To prevent that duplication, you should look into Spark's caching functionality, after reading up on the details on transformations and actions and what Spark actually does behind the scene: https://spark.apache.org/docs/latest/programming-guide.html#rdd-operations.

    How exactly does the filter operation search for my queried edges (when I execute an action)?

    in Spark GraphX the edges are stored in a an RDD of type EdgeRDD[ED] where ED is the type of your edge attribute, in your case String. This special RDD does some special optimizations in the background, but for your purposes it behaves like its superclass RDD[Edge[ED]] and filtering occurs like filtering any RDD: it will iterate through all items, applying the given predicate to each. An RDD however is split into a number of partitions and Spark will filter multiple partitions in parallel; in your case where you seem to run Spark locally it will do as many in parallel as the number of cores you have, or how much you have specified explicitly with --master local[4] for instance.

    The RDD with edges is partitioned based on the PartitionStrategy that is set, for instance if you create your graph with Graph.fromEdgeTuples or by calling partitionBy on your graph. All strategies are based on the edge's vertices however, so don't have any knowledge about your attribute, and so don't affect your filtering operation, except maybe for some unbalanced network load if you'd run it on a cluster, all 'cat' edges end up in the same partition/executor and you do a collect or some shuffle operation. See the GraphX docs on Vertex and Edge RDDs for a bit more information on how graphs are represented and partitioned.