I'm experimenting Spark and i'm new to Scala and GraphX.
I'm working on a bipartite network of movies and actors who played in them. My input is a space separated file, with one edge per line, the first column for the movie ID, the second one for the actor ID, here is an extract :
movie actor
1 1
2 1
2 2
3 1
3 3
3 4
3 5
3 6
3 7
4 1
My goal is to study the relationships between actors. To do so, i wanna create an unipartite graph actor-actor which edges values depending on the number of common movies for those actors.
I start by creating a RDD of edges :
val edges: RDD[Edge[String]] = sc.textFile("file:///home/actorMovie").map { line =>
val fields = line.split(" ")
Edge(fields(0).toLong, fields(1).toLong)
}
Which looks ok :
edges.take(10)
res8: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,1,null), Edge(2,1,null), Edge(2,2,null), Edge(3,1,null), Edge(3,3,null), Edge(3,4,null), Edge(3,5,null), Edge(3,6,null), Edge(3,7,null), Edge(4,1,null))
Then from there, i don't know how to transform it in a weighted unipartite graph.
What you have right now is not a bipartite graph. To make it a valid graph you'll have to make a lot of transformations. The easiest way to do it is to use dataframes:
val df = spark.read
.option("delimiter", " ") // if there is a header
.option("header", "true")
.csv(path)
.toDF("movie", "actor")
Once you have data self-join and aggregate like this:
import org.apache.spark.sql.functions._
val e = df.alias("df1")
.join(df.alias("df2"), "movie")
// Sort soruce and destination
.groupBy(
greatest("df1.actor", "df2.actor").as("srcId"),
least("df1.actor", "df2.actor").as("dstId"))
.agg(count("*").as("attr"))
This can be converted to Graph
or GraphFrame
as I've shown here How to obtain convert DataFrame to specific RDD?.
To create valid graph you'd have to encode labels. For example:
val dfe = df.select(
concat(lit("m"), $"movie").as("movie"),
concat(lit("a"), $"actor").as("actor"))
Then you can apply StringIndexer
as explained in How to encode string values into numeric values in Spark DataFrame or use GraphFrame
to automate this:
import graphframes._
val gf = GraphFrame.fromEdges(dfe.toDF("src", "dst"))
Then you can use message passing or graph pattern matching to find two hop neighbors.