Search code examples
javascalaapache-sparkspark-graphx

GraphX - Weighted shortest path implementation - java.lang.NoSuchMethodError


Edit - I discovered that the book was written for scala 1.6 but the remainder is 2.11.

I am trying to implement a weighted shortest path algorithm from Michael Malak and Robin East's Spark GraphX in Action book. The part in question is Listing 6.4 "Executing the shortest path algorithm that uses breadcrumbs" from Chapter 6 here.

I have my own graph that I create from two RDDs. There are 344436 vertices and 772983 edges. I can perform an unweighted shortest path computation using the native GraphX library and I'm confident in the graph construction.

In this case I use their Dijkstra's implementation as follows:

val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).cache()

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
  var g2 = g.mapVertices(
      (vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
  )

  for (i <- 1L to g.vertices.count-1) {
    val currentVertexId = g2.vertices
      .filter(!_._2._1)
      .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
          (a,b) => if (a._2._2 < b._2._2) a else b)
      )
      ._1

    val newDistances = g2.aggregateMessages[(Double, List[VertexId])](
        ctx => if (ctx.srcId == currentVertexId) {
          ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
        },
        (a,b) => if (a._1 < b._1) a else b
    )

    g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
      val newSumVal = newSum.getOrElse((Double.MaxValue,List[VertexId]()))

      (
          vd._1 || vid == currentVertexId,
          math.min(vd._2, newSumVal._1),
          if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
      )
    })

  }

  g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
    (vd, dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
      .productIterator.toList.tail
  ))
}

//  Path Finding - random node from which to find all paths
val v1 = 4000000028222916L

I then call their function with my graph and a random vertex ID. Previously I had issues with v1 not being recognised as long type and the L suffix solved this.

val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect

println(results)

However, this returns the following:

Error: Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
    at GraphX$.dijkstra$1(GraphX.scala:51)
    at GraphX$.main(GraphX.scala:85)
    at GraphX.main(GraphX.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Line 51 refers to the line var g2 = g.mapVertices( Line 85 refers to the line val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect

What method is this exception referring to? I am able to package with sbt without error and I canno see what method I am calling whcih does not exist.


Solution

  • The issue wasn't in a version error nor a missing implementation but a misleading error from the compiler.

    Ok so here is the thing: After investigating the code, I have noticed that the following section contained one extra closing parentheses :

    val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
      .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
        (a, b) => if (a._2._2 < b._2._2) a else b))._1 
                                                  ^
                                                  |
    

    You'll just need to remove that extra parentheses and it will work perfectly. Here is the full code :

    // scala> :pa
    // Entering paste mode (ctrl-D to finish)
    
    import org.apache.spark.graphx._
    def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
      var g2 = g.mapVertices(
    (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
      )
    
      for (i <- 1L to g.vertices.count - 1) {
        val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
          .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
            (a, b) => if (a._2._2 < b._2._2) a else b)._1
    
        val newDistances: VertexRDD[(Double, List[VertexId])] =
          g2.aggregateMessages[(Double, List[VertexId])](
        ctx => if (ctx.srcId == currentVertexId) {
          ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
        },
        (a, b) => if (a._1 < b._1) a else b
      )
    
    g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
      val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]()))
      (
        vd._1 || vid == currentVertexId,
        math.min(vd._2, newSumVal._1),
        if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
        )
    })
    }
    
      g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
    (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
      .productIterator.toList.tail
      ))
    }
    
    //  Path Finding - random node from which to find all paths
    

    Now, let's test it :

    val myVertices: RDD[(VertexId, String)] = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
    val myEdges: RDD[Edge[Double]] = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0), Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0), Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
    
    val my_graph = Graph(myVertices, myEdges).cache()
    
    val v1 = 4000000028222916L
    
    val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect
    
    // [CTRL-D]
    // Exiting paste mode, now interpreting.
    // [Lscala.Tuple2;@668a0785                                                        
    // import org.apache.spark.graphx._
    // myVertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = ParallelCollectionRDD[556] at makeRDD at <console>:37
    // myEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = ParallelCollectionRDD[557] at makeRDD at <console>:39
    // my_graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@49ea0d90
    // dijkstra: [VD](g: org.apache.spark.graphx.Graph[VD,Double], origin: org.apache.spark.graphx.VertexId)org.apache.spark.graphx.Graph[(VD, List[Any]),Double]
    // v1: Long = 4000000028222916
    // results: Array[(String, List[Any])] = Array((A,List(0.0, List())), (B,List(7.0, List(1))), (C,List(15.0, Li...
    scala> results.foreach(println)
    // (A,List(0.0, List()))
    // (B,List(7.0, List(1)))
    // (C,List(15.0, List(1, 2)))
    // (D,List(5.0, List(1)))
    // (E,List(14.0, List(1, 2)))
    // (F,List(11.0, List(1, 4)))
    // (G,List(22.0, List(1, 4, 6)))