This question is a "sequel" to a previous one. I am new to spark graphx and scala and I was wondering how I can perform the operation below.
How can I merge two graphs into a new graph so that the new graph has the following property:
The attributes of the common edges of the two graphs are averaged (or in a more general way, apply an averaging function between the edge attributes (edge attributes are of type double))
We consider that common edge = same srcId and same dstId, and vertices and edges are unique.
Assuming you have only two graphs and both contain the same set of vertices without duplicate edges you can use combine edges and use groupEdges
method on a new graph:
val graph1: Graph[T,Double] = ???
val graph2: Graph[T,Double] = ???
Graph(graph1.vertices, graph1.edges.union(graph2.edges))
.groupEdges((val1, val2) => (val1 + val2) / 2.0)
or a little bit more universal:
Graph(graph1.vertices, graph1.edges.union(graph2.edges))
.mapEdges(e => (e.attr, 1.0))
.groupEdges((val1, val2) => (val1._1 + val2._1, val1._2 + val2._2))
.mapEdges(e => e.attr._1 / e.attr._2)
If that is not enough you can combine values and create a new graph from scratch:
def edgeToPair (e: Edge[Double]) = ((e.srcId, e.dstId), e.attr)
val pairs1 = graph1.edges.map(edgeToPair)
val pairs2 = graph2.edges.map(edgeToPair)
// Combine edges
val newEdges = pairs1.union(pairs2)
.aggregateByKey((0.0, 0.0))(
(acc, e) => (acc._1 + e, acc._2 + 1.0),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{case ((srcId, dstId), (acc, count)) => Edge(srcId, dstId, acc / count)}
// Combine vertices assuming there are no conflicts
// like different labels
val newVertices = graph1.vertices.union(graph2.vertices).distinct
// Create new graph
val newGraph = Graph(newVertices, newEdges)
where aggregateByKey
can be replaced by groupByKey
followed by mapping that requires all values at once like median.