Search code examples
scalaapache-spark-sqlrddspark-graphx

Scala-Spark: Convert Dataframe to RDD[Edge]


I have a dataframe that it represents edges of a graph; this is the schema:

root |-- src: string (nullable = true) 
     |-- dst: string (nullable = true) 
     |-- relationship: struct (nullable = false) 
     | |-- business_id: string (nullable = true) 
     | |-- normalized_influence: double (nullable = true)

I want to convert it to RDD[Edge] to work with Pregel API and my difficulties is on attribute "relationship". How can convert it?


Solution

  • Edge is a parameterized class. This means you can store whatever you like in each edge, besides the source and destination ids. In your case, it would probably be an Edge[Relationship]. You can use case classes to map both the dataframe and the RDD[Edge[Relationship]]:

    import scala.util.hashing.MurmurHash3
    case class Relationship(business_id: String, normalized_influence: Double)
    case class MyEdge(src: String, dst: String, relationship: Relationship)
    
    val edges: RDD[Edge[Relationship]] = df.as[MyEdge].rdd.map { edge =>
        Edge(
            MurmurHash3.stringHash(edge.src).toLong, // VertexId type is a Long, so we need to hash your string
            MurmurHash3.stringHash(edge.dst).toLong,
            edge.relationship
        )
    }