Search code examples
scalaapache-sparkdataframedatasetgraphframes

How to map values in column(multiple columns also) of one dataset to other dataset


I am woking on graphframes part,where I need to have edges/links in d3.js to be in indexed values of Vertex/nodes as source and destination.

Now I have VertexDF as

+--------------------+-----------+
|                  id|      rowID|
+--------------------+-----------+
|      Raashul Tandon|          3|
|         Helen Jones|          5|
----------------------------------

EdgesDF

+-------------------+--------------------+
|                src|                 dst|
+-------------------+--------------------+
|     Raashul Tandon|    Helen Jones     |
------------------------------------------

Now I need to transform this EdgesDF as below

+-------------------+--------------------+
|                src|                 dst|
+-------------------+--------------------+
|     3             |            5       |
------------------------------------------

All the column values should be having the index of the names taken from VertexDF.I am expecting in Higher-order functions. My approach is to convert VertexDF to map, then iterating the EdgesDF and replaces every occurence.

What I have Tried

made a map of name to ids

val Actmap = VertxDF.collect().map(f =>{
  val name = f.getString(0)
  val id = f.getLong(1)
  (name,id)
})
.toMap

Used that map with EdgesDF

EdgesDF.collect().map(f => {
  val src = f.getString(0)
  val dst = f.getString(0)

  val src_id = Actmap.get(src)
  val dst_id = Actmap.get(dst)
  (src_id,dst_id)
})


Solution

  • Your approach of collect-ing the vertex and edge dataframes would work only if they're small. I would suggest left-joining the edge and vertex dataframes to get what you need:

    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    val VertxDF = Seq(
      ("Raashul Tandon", 3),
      ("Helen Jones", 5),
      ("John Doe", 6),
      ("Rachel Smith", 7)
    ).toDF("id", "rowID")
    
    val EdgesDF = Seq(
      ("Raashul Tandon", "Helen Jones"),
      ("Helen Jones", "John Doe"),
      ("Unknown", "Raashul Tandon"),
      ("John Doe", "Rachel Smith")
    ).toDF("src", "dst")
    
    EdgesDF.as("e").
      join(VertxDF.as("v1"), $"e.src" === $"v1.id", "left_outer").
      join(VertxDF.as("v2"), $"e.dst" === $"v2.id", "left_outer").
      select($"v1.rowID".as("src"), $"v2.rowID".as("dst")).
      show
    // +----+---+
    // | src|dst|
    // +----+---+
    // |   3|  5|
    // |   5|  6|
    // |null|  3|
    // |   6|  7|
    // +----+---+