Search code examples
apache-sparkspark-graphxconnected-components

Spark: GraphX fails to find connected components in graphs with few edges and long paths


I'm new to Spark and GraphX and did some experiments with its algorithm to find connected components. I noticed that the structure of the graph seems to have a strong impact on the performance.

It was able to compute graphs with millions of vertices and edges, but for a certain group of graphs, the algorithm did not finish in time, but eventually fails with an OutOfMemoryError: GC overhead limit exceeded.

The algorithm seems to have problems with graphs that contain long paths. For instance, for this graph { (i,i+1) | i <- {1..200} } the computation fails. However, when I added transitive edges, the computation finished immediately:

{ (i,j) | i <- {1..200}, j <- {i+1,200} }

Also graphs like this were no problem:

{ (i,1) | i <- {1..200} }

Here is a minimal example to reproduce the problem:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable

object Matching extends Logging {

  def main(args: Array[String]): Unit = {
    val fname = "input.graph"
    val optionsList = args.drop(1).map { arg =>
      arg.dropWhile(_ == '-').split('=') match {
        case Array(opt, v) => opt -> v
        case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
      }
    }
    val options = mutable.Map(optionsList: _*)

    val conf = new SparkConf()
    GraphXUtils.registerKryoClasses(conf)

    val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
      .map(PartitionStrategy.fromString(_))
    val edgeStorageLevel = options.remove("edgeStorageLevel")
      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
    val vertexStorageLevel = options.remove("vertexStorageLevel")
      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)

    val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
    val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
      edgeStorageLevel = edgeStorageLevel,
      vertexStorageLevel = vertexStorageLevel).cache()
    log.info("Loading graph...")
    val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
    log.info("Loading graph...done")

    log.info("Computing connected components...")
    val cc = ConnectedComponents.run(graph)
    log.info("Computed connected components...done")

    sc.stop()
  }
}

The input.graph file can look this this (10 nodes, 9 edges connecting them):

1 2
2 3
3 4
4 5
5 6
6 7
7 8
8 9
9 10

When it fails, it hangs in ConnectedComponents.run(graph). The error message is:

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.regex.Pattern.compile(Pattern.java:1054)
    at java.lang.String.replace(String.java:2239)
    at org.apache.spark.util.Utils$.getFormattedClassName(Utils.scala:1632)
    at org.apache.spark.storage.RDDInfo$$anonfun$1.apply(RDDInfo.scala:58)
    at org.apache.spark.storage.RDDInfo$$anonfun$1.apply(RDDInfo.scala:58)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.storage.RDDInfo$.fromRdd(RDDInfo.scala:58)
    at org.apache.spark.scheduler.StageInfo$$anonfun$1.apply(StageInfo.scala:80)
    at org.apache.spark.scheduler.StageInfo$$anonfun$1.apply(StageInfo.scala:80)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.scheduler.StageInfo$.fromStage(StageInfo.scala:80)
    at org.apache.spark.scheduler.Stage.<init>(Stage.scala:99)
    at org.apache.spark.scheduler.ShuffleMapStage.<init>(ShuffleMapStage.scala:44)
    at org.apache.spark.scheduler.DAGScheduler.newShuffleMapStage(DAGScheduler.scala:317)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$newOrUsedShuffleStage(DAGScheduler.scala:352)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage$1.apply(DAGScheduler.scala:286)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage$1.apply(DAGScheduler.scala:285)
    at scala.collection.Iterator$class.foreach(Iterator.scala:742)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.mutable.Stack.foreach(Stack.scala:170)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:285)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:389)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:386)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:386)
    at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:398)

I am running a local Spark node and start the JVM with the following options:

-Dspark.master=local -Dspark.local.dir=/home/phil/tmp/spark-tmp -Xms8g -Xmx8g

Can you help me understand why it has problem with this toy graph (201 nodes and 200 edges), but on the other hand can solve a realistic graph with multiple millions of edges in about 80 seconds? (In both examples, I use the same setup and configuration.)

UPDATE:

Can also be reproduced in the spark-shell:

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

val graph = GraphLoader.edgeListFile(sc, "input.graph").cache()
ConnectedComponents.run(graph)

I created a bug report: SPARK-15042


Solution

  • According to SPARK-15042, the problem still exists in 2.1.0-snapshot.

    The progress toward fixing the bug can be seen in SPARK-5484.