Search code examples
scalaapache-sparkspark-graphx

Structural Operators between graphs


This question is a "sequel" to a previous one. I am new to spark graphx and scala and I was wondering how I can perform the operation below.

How can I merge two graphs into a new graph so that the new graph has the following property:

The attributes of the common edges of the two graphs are averaged (or in a more general way, apply an averaging function between the edge attributes (edge attributes are of type double))

We consider that common edge = same srcId and same dstId, and vertices and edges are unique.


Solution

  • Assuming you have only two graphs and both contain the same set of vertices without duplicate edges you can use combine edges and use groupEdges method on a new graph:

    val graph1: Graph[T,Double] = ???
    val graph2: Graph[T,Double] = ???
    
    Graph(graph1.vertices, graph1.edges.union(graph2.edges))
      .groupEdges((val1, val2) => (val1 + val2) / 2.0)
    

    or a little bit more universal:

    Graph(graph1.vertices, graph1.edges.union(graph2.edges))
      .mapEdges(e => (e.attr, 1.0))
      .groupEdges((val1, val2) => (val1._1 + val2._1, val1._2 + val2._2))
      .mapEdges(e => e.attr._1 / e.attr._2)
    

    If that is not enough you can combine values and create a new graph from scratch:

    def edgeToPair (e: Edge[Double]) = ((e.srcId, e.dstId), e.attr)
    val pairs1 = graph1.edges.map(edgeToPair)
    val pairs2 = graph2.edges.map(edgeToPair)
    
    // Combine edges
    val newEdges = pairs1.union(pairs2)
      .aggregateByKey((0.0, 0.0))(
        (acc, e) => (acc._1 + e, acc._2 + 1.0),
        (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
      ).map{case ((srcId, dstId), (acc, count)) => Edge(srcId, dstId, acc / count)}
    
    // Combine vertices assuming there are no conflicts
    // like different labels
    val newVertices = graph1.vertices.union(graph2.vertices).distinct
    
    // Create new graph
    val newGraph = Graph(newVertices, newEdges)
    

    where aggregateByKey can be replaced by groupByKey followed by mapping that requires all values at once like median.