Search code examples
apache-sparkspark-graphx

Problems running Spark GraphX algorithms on generated graphs


I have created a graph in Spark GraphX using the following codes. (See my question and solution)

import scala.math.random
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner

object SparkER {

  val nPartitions: Integer = 4
  val n: Long = 100
  val p: Double = 0.1

  def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
    (0L until n).filter(_ % nPartitions == i).toIterator
  }

  def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
    (i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
  }

  def genEdgesForPartition(iter: Iterator[Long]) = {
    val random = new Random(new java.security.SecureRandom())
    iter.flatMap(genEdgesForId(p, n, random))
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark ER").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val empty = sc.parallelize(Seq.empty[Int], nPartitions)
    val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))

    val edges = ids.mapPartitions(genEdgesForPartition)
    val vertices: VertexRDD[Unit] = VertexRDD(ids.map((_, ())))

    val graph = Graph(vertices, edges)

    val cc = graph.connectedComponents().vertices //Throwing Exceptions

    println("Stopping Spark Context")
    sc.stop()
  }
}

Now, I can access the graph and see the degrees of the nodes. But when I try to get some measures, such as Connected components, I am getting the following exceptions.

15/12/22 12:12:57 ERROR Executor: Exception in task 3.0 in stage 6.0 (TID 19)
java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
    at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
15/12/22 12:12:57 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 17)
java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
    at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Why am I nable to perform these operations on the generated graph using GraphX?


Solution

  • I found that, if I do the following the exception does not occur.

    val graph = Graph(vertices, edges).partitionBy(PartitionStrategy.RandomVertexCut)
    

    Apparently, some GraphX algorithms require the repartitioning. But the purpose is not entirely clear to me.