Search code examples
apache-sparkspark-graphx

Inspecting GraphX Graph Object


Spark version 1.6.1

Creating Edge and Vertex RDDs

val vertices_raw = sqlContext.read.json("vertices.json.gz")

val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))

val verticesRDD: RDD[(VertexId, String)] = vertices

val edges_raw = sqlContext.read.json("edges.json.gz")

val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))

I have an EdgesRDD that I can inspect

[IN] edgesRDD
res10: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[19] at map at <console>:38
[IN] edgesRDD.foreach(println)

Edge(5000005125036254,5000005125036231,42.26548472559799)
Edge(5000005125651333,5000005125651330,29.557979625165135)
Edge(5000005125651329,5000005125651330,81.9310872300414)

I have a verticesRDD

[IN] verticesRDD
res12: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[9] at map at <console>:38

[IN] verticesRDD.foreach(println)
(5000005125651331,343722)
(5000005125651332,343723)
(5000005125651333,343724)

I combine these to create a graph.

[IN] val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@303bbd02

I can inspect the edgesRDD within the graph object:

[IN] graph.edges.foreach(println)

Edge(5000005125774813,4000000029917080,72.9742898009203)
Edge(5000005125774814,5000005125774813,49.87951589790352)
Edge(5000005125775080,4000000029936370,69.62871049042008)

However, when I inspect the verticesRDD:

[IN] graph.vertices.foreach(println)

Is there an issue with my graph construction?

ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 13)
java.lang.ArrayStoreException: java.lang.Long
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/08/17 12:27:16 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

16/08/17 12:27:16 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
        at $iwC$$iwC$$iwC.<init>(<console>:68)
        at $iwC$$iwC.<init>(<console>:70)
        at $iwC.<init>(<console>:72)
        at <init>(<console>:74)
        at .<init>(<console>:78)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArrayStoreException: java.lang.Long
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Edit. After some digging, is this related? I have checked the VertexId requirements:

VertexId = type Long
A 64-bit vertex identifier that uniquely identifies a vertex within a graph.

The unique fields I have provided, for example, 5000005125036318 are satisfactory.


Solution

  • Yes. Your issue is related to ArrayStoreException as your current code tries to store Long type into an array of String.

    ArrayStoreException is thrown to indicate that an attempt has been made to store the wrong type of object into an array of objects

    Why ArrayStoreException?

    Below is a snapshot from your vertices.json.gz file:

    {"toid": "osgb4000000031043205", "index": 1, "point": [508180.748, 195333.973]}
    {"toid": "osgb4000000031043206", "index": 2, "point": [508163.122, 195316.627]}
    {"toid": "osgb4000000031043207", "index": 3, "point": [508172.075, 195325.719]}
    {"toid": "osgb4000000031043208", "index": 4, "point": [508513, 196023]}
    

    Where "index" values are by default read as LongType when creating vertices_raw DataFrame, as seen below:

    scala> vertices_raw.schema
    res4: org.apache.spark.sql.types.StructType = StructType(StructField(index,LongType,true), StructField(point,ArrayType(DoubleType,true),true), StructField(toid,StringType,true))
    

    and when you create your graph, Long types are getting stored into array of String which causes this exception:

    val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
    


    Solution 1:

    Use Long for index, i.e. replace below lines:

    val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))
    
    val verticesRDD: RDD[(VertexId, String)] = vertices
    
    val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
    

    with:

    val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
    
    val verticesRDD: RDD[(VertexId, Long)] = vertices
    
    val graph: Graph[(Long),Double] = Graph(verticesRDD, edgesRDD)
    


    Solution 2:

    Create a new DataFrame vertices_raw2 from vertices_raw to convert index's type from LongType to StringType as show below:

    import org.apache.spark.sql.functions._
    
    val to_string = udf[String, Long]( _.toString)
    
    val vertices_raw2 = vertices_raw.withColumn("index", to_string(vertices_raw("index"))).select("index", "toid")
    

    and then further use vertices_raw2 DataFrame to create your vertices RDD:

    val vertices = vertices_raw2.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))
    

    Output:

    scala> graph.edges.foreach(println)
    Edge(5000005125740769,4000000029965899,51.55460482650549)
    Edge(5000005125740770,5000005125740759,26.108461618676447)
    Edge(5000005125740771,5000005125740763,30.841246458481766) ...
    
    scala> graph.vertices.foreach(println)
    (4000000029867298,58335)
    (4000000029892180,10846)
    (4000000027730512,338018)
    (4000000023185673,43945) ...