Search code examples
scalaapache-sparkspark-graphx

Scala getting the adjacency list of each vertex from a list of source node and destination node


I'm fairly new to Scala and GraphX, so this might be a trivial question. I'm trying to reverse the direction of each edge of a graph and then get the adjacency list of each vertex of the new reversed graph. The input is in the form "FromNodeId \t ToNodeId"

0 1
0 2
1 2
1 3

I managed to reverse the directions of edges with the following code:

object Problem2{
    def main(args: Array[String]){
    val inputFile:String = args(0)
    val outputFolder = args(1)
    val conf = new SparkConf().setAppName("Problem2").setMaster("local")
    val sc = new SparkContext(conf)
    val input = sc.textFile(inputFile)
    val graph = GraphLoader.edgeListFile(sc,inputFile)
    val newGraph = graph.reverse.edges
}
}

The output is of the form

Edge(1,0,1)
Edge(3,1,1)
Edge(2,1,1)
Edge(2,0,1)

My questions are, 1. Is there a more efficient approach for the problem 2. how can I proceed to build the adjacency list for each vertex from here?


Solution

  • The answer to both questions is the collectNeighbors and collectNeighborIds.

    For each vertex it computes the list of the neighbors. Notice the EdgeDirection parameter.

    val vertices: RDD[(VertexId, String)] =  
        sc.parallelize(Array((1L,""), (2L,""), (4L,""), (6L,"")))
    
    
    val edges: RDD[Edge[String]] = 
        sc.parallelize(Array(
            Edge(1L, 2L, ""),
            Edge(1L, 4L, ""),
            Edge(1L, 6L, "")))
    val inputGraph = Graph(vertices, edges)
    
    val verticesWithSuccessors: VertexRDD[Array[VertexId]] = 
        inputGraph.ops.collectNeighborIds(EdgeDirection.Out)
    

    So as a graph is inmutable, you have to build a new one. Same edges, new vertices

    val successorGraph = Graph(verticesWithSuccessors, edges)
    

    And here a partial result:

    val res = successorGraph.vertices.collect()
    res: Array[(org.apache.spark.graphx.VertexId,
             Array[org.apache.spark.graphx.VertexId])] = 
      Array((4,Array()), (1,Array(2, 4, 6)), (6,Array()), (2,Array()))