Search code examples
apache-sparkspark-graphx

Spark,Graphx program does not utilize cpu and memory


I have a function that takes the neighbors of a node ,for the neighbors i use broadcast variable and the id of the node itself and it calculates the closeness centrality for that node.I map each node of the graph with the result of that function.When i open the task manager the cpu is not utilized at all as if it is not working in parallel , the same goes for memory , but the every node executes the function in parallel and also the data is large and it takes time to complete ,its not like it does not need the resources.Every help is truly appreciated , thank you. For loading the graph i use val graph = GraphLoader.edgeListFile(sc, path).cache

object ClosenessCentrality {

  case class Vertex(id: VertexId)

  def run(graph: Graph[Int, Float],sc: SparkContext): Unit = {
    //Have to reverse edges and make graph undirected because is bipartite
    val neighbors = CollectNeighbors.collectWeightedNeighbors(graph).collectAsMap()
    val bNeighbors = sc.broadcast(neighbors)

    val result = graph.vertices.map(f => shortestPaths(f._1,bNeighbors.value))
    //result.coalesce(1)
    result.count()

  }

  def shortestPaths(source: VertexId,  neighbors: Map[VertexId, Map[VertexId, Float]]): Double ={
    val predecessors = new mutable.HashMap[VertexId, ListBuffer[VertexId]]()
    val distances = new mutable.HashMap[VertexId, Double]()
    val q = new FibonacciHeap[Vertex]
    val nodes = new mutable.HashMap[VertexId, FibonacciHeap.Node[Vertex]]()

    distances.put(source, 0)

    for (w <- neighbors) {
      if (w._1 != source)
        distances.put(w._1, Int.MaxValue)

      predecessors.put(w._1, ListBuffer[VertexId]())
      val node = q.insert(Vertex(w._1), distances(w._1))
      nodes.put(w._1, node)
    }

    while (!q.isEmpty) {
      val u = q.minNode
      val node = u.data.id
      q.removeMin()
      //discover paths
      //println("Current node is:"+node+" "+neighbors(node).size)
      for (w <- neighbors(node).keys) {
        //print("Neighbor is"+w)
        val alt = distances(node) + neighbors(node)(w)
//        if (distances(w) > alt) {
//          distances(w) = alt
//          q.decreaseKey(nodes(w), alt)
//        }
//        if (distances(w) == alt)
//          predecessors(w).+=(node)
         if(alt< distances(w)){
           distances(w) = alt
           predecessors(w).+=(node)
           q.decreaseKey(nodes(w), alt)
         }

      }//For
    }
    val sum = distances.values.sum
    sum
  }

Solution

  • To provide somewhat of an answer to your original question, I suspect that your RDD only has a single partition, thus using a single core to process.

    The edgeListFile method has an argument to specify the minimum number of partitions you want. Also, you can use repartition to get more partitions.

    You mentionned coalesce but that only reduces the number of partitions by default, see this question : Spark Coalesce More Partitions