Search code examples
scalaapache-sparkspark-graphx

Scala Write the adjacency list of each node of a graph to a text file


I am trying to reverse a directed graph and to write the adjacency list of each vertex to a text file in the format

NodeId \t NeighbourId1,NeighbourId2,...,NeighbourIdn

So far I've only tried printing my output which is as follows:

(4,[J@13ad83aa)
(0,[J@338ff780)
(1,[J@6737f62b)
(3,[J@1250d788)
(2,[J@6d1fa6bb)

Whereas it should be of the format:

4   2
0   4
1   0,2
3   1,2,3
2   0,1

The current code I've been using is

object Problem2{
def main(args: Array[String]){
val inputFile:String = args(0)
val outputFolder = args(1)
val conf = new SparkConf().setAppName("Problem2").setMaster("local")
val sc = new SparkContext(conf)

val graph = GraphLoader.edgeListFile(sc,inputFile)
val edges = graph.reverse.edges
val vertices = graph.vertices
val newGraph = Graph(vertices,edges)

val verticesWithSuccessors: VertexRDD[Array[VertexId]] = 
newGraph.ops.collectNeighborIds(EdgeDirection.Out)

val successorGraph = Graph(verticesWithSuccessors, edges)
val res = successorGraph.vertices.collect()

val adjList = successorGraph.vertices.foreach(println)

I don't think mkString() can be used with a graph object. Is there a similar method for graph objects to get the string?


Solution

  • Let's take again this example:

    val vertices: RDD[(VertexId, String)] =  
        sc.parallelize(Array((1L,""), (2L,""), (4L,""), (6L,"")))
    
    
    val edges: RDD[Edge[String]] = 
        sc.parallelize(Array(
            Edge(1L, 2L, ""),
            Edge(1L, 4L, ""),
            Edge(1L, 6L, "")))
    val inputGraph = Graph(vertices, edges)
    
    val verticesWithSuccessors: VertexRDD[Array[VertexId]] = 
        inputGraph.ops.collectNeighborIds(EdgeDirection.Out)
    val successorGraph = Graph(verticesWithSuccessors, edges)
    

    Once you have this:

    val adjList = successorGraph.vertices
    

    You can translate into a DataFrame easily:

    val df = adjList.toDF(Seq("node", "adjacents"): _*)
    df.show()
    +----+---------+
    |node|adjacents|
    +----+---------+
    |   1|[2, 4, 6]|
    |   2|       []|
    |   4|       []|
    |   6|       []|
    +----+---------+
    

    Now it's easy to transform with columns. Here a not so pretty example:

    val result = df.rdd.collect().map(l=> l(0).asInstanceOf[Long] + "\t"  + l(1).asInstanceOf[Seq[Long]].mkString(" "))
    result.foreach(println(_))
    
    1   2 4 6
    2   
    4   
    6   
    

    Or also you can try with UDFs or handle the columns as you want.

    Hope this helps!