Search code examples
scalaapache-sparkspark-graphx

Spark GraphX : requirement failed: Invalid initial capacity


I am new to Spark, Scala.

I am trying to perform Triangle Counts in this dataset : DataSet
for a hobby project

This is the code I have written so far :

   import org.apache.spark.SparkConf
   import org.apache.spark.SparkContext
   import org.apache.spark.graphx.Edge
   import org.apache.spark.graphx.Graph
   import org.apache.spark.graphx.Graph.graphToGraphOps
   import org.apache.spark.graphx.PartitionStrategy
   import org.apache.spark.rdd.RDD
   import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

   object GraphXApps {
       def main(args: Array[String]): Unit = {
               val conf = new SparkConf()
               .setAppName("GraphXApps")
               .setSparkHome(System.getenv("SPARK_HOME"))
               .setJars(SparkContext.jarOfClass(this.getClass).toList)

               val sc = new SparkContext(conf)

               // Load the edges in canonical order and partition the graph for triangle count
               val edges: RDD[Edge[String]] =
               sc.textFile(args(0)).map { line =>
               val fields = line.split("\t")
               Edge(fields(0).toLong, fields(1).toLong)
               }

               val graph : Graph[String, String] = Graph.fromEdges(edges.sortBy(_.srcId, ascending = true, 1), "defaultProperty").partitionBy(PartitionStrategy.RandomVertexCut)
                       // Find the triangle count for each vertex
                       val triCounts = graph.triangleCount().vertices
                       val triCountById = graph.vertices.join(triCounts).map(_._2._2)   
                       // Print the result
                       println(triCountById.collect().mkString("\n"))

                       sc.stop()

       }
   }

But I am getting this error : java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity

Please let me know where I am going wrong. It would be really helpful.

Full Stack Trace

16/10/31 01:03:08 ERROR TaskSetManager: Task 0 in stage 8.0 failed 1 times; aborting job
16/10/31 01:03:08 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 
16/10/31 01:03:08 INFO TaskSchedulerImpl: Cancelling stage 8
16/10/31 01:03:08 INFO DAGScheduler: ShuffleMapStage 8 (mapPartitions at VertexRDDImpl.scala:245) failed in 0.131 s
16/10/31 01:03:08 INFO DAGScheduler: Job 0 failed: collect at GraphXApps.scala:47, took 3.128921 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8, localhost): java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:51)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:57)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:70)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:69)
    at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:154)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
    at GraphXApps$.main(GraphXApps.scala:47)
    at GraphXApps.main(GraphXApps.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:51)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:57)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:70)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:69)
    at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:154)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Solution

  • This appears to be a bug within Spark 2.0 (so far, have tested this against 2.0, 2.0.1, and 2.0.2). Jira [SPARK-18200]: GraphX Invalid initial capacity when running triangleCount has been created to address this.

    Your code should work okay with Spark 1.6 as noted in this linked notebook.

    But as you noted, it failed in Spark 2.0, as noted in this linked notebook.

    In the interim, please try Spark 1.6 or try using GraphFrames, as noted in this linked notebook.

    HTH!