Search code examples
scalaapache-sparkspark-graphx

spark graphx multiple edge types


I have started using spark very recently. Currently I am testing on a bipartite graph that has different vertex and edge types.

From the research I did in graphx, to have different edges and some with properties I need to subclass the edges.

Here is a snippet of the code:

scala> trait VertexProperty
defined trait VertexProperty

scala> case class paperProperty(val paperid: Long, val papername: String, val doi: String, val keywords: String) extends VertexProperty
defined class paperProperty

scala> case class authorProperty(val authorid: Long, val authorname: String) extends VertexProperty
defined class authorProperty

scala> val docsVertces: RDD[(VertexId, VertexProperty)] = docs.rdd.map(x => (x(0).asInstanceOf[VertexId],paperProperty(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[String],x(2).asInstanceOf[String],x(3).asInstanceOf[String])))
docsVertces: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, VertexProperty)] = MapPartitionsRDD[23] at map at <console>:47

scala> val authorVertces: RDD[(VertexId, VertexProperty)] = authors.rdd.map(x => (x(0).asInstanceOf[VertexId],authorProperty(x(0).asInstanceOf[Long],x(1).asInstanceOf[String])))
authorVertces: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, VertexProperty)] = MapPartitionsRDD[24] at map at <console>:41

scala> val vertices = VertexRDD(docsVertces ++ authorVertces)
vertices: org.apache.spark.graphx.VertexRDD[VertexProperty] = VertexRDDImpl[28] at RDD at VertexRDD.scala:57

scala>

However I failing with the edges.

scala> class EdgeProperty()
defined class EdgeProperty

scala> case class authorEdgeProperty( val doccount: Long) extends  EdgeProperty()
defined class authorEdgeProperty

scala> case class citeEdgeProperty() extends  EdgeProperty()
defined class citeEdgeProperty

scala> // edge using subclass will not work we need to have one consistent superclass

scala> val docauthoredges = docauthor.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId],     authorEdgeProperty(x(1).asInstanceOf[Long])))
docauthoredges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[authorEdgeProperty]] = [srcId: bigint, dstId: bigint ... 1 more field]

scala> val docciteedges = doccites.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId], citeEdgeProperty()))
docciteedges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[citeEdgeProperty]] = [srcId: bigint, dstId: bigint ... 1 more field]

scala> docauthoredges.unionAll(docciteedges)
<console>:52: error: type mismatch;
 found   :    org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[citeEdgeProperty]]
 required: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[authorEdgeProperty]]
       docauthoredges.unionAll(docciteedges)
                               ^

scala>

I attempted to cast the edge to their superclass and am recieving the following message:

scala> val docauthoredges = docauthor.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId],         authorEdgeProperty(x(1).asInstanceOf[Long]).asInstanceOf[EdgeProperty]))
java.lang.UnsupportedOperationException: No Encoder found for EdgeProperty
- field (class: "EdgeProperty", name: "attr")
- root class: "org.apache.spark.graphx.Edge"
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
...

Any help will be greatly appreciated


Solution

  • Your question is a bit futile because GraphX doesn't support Datasets and both edges and vertices should be passed as RDDs, but for the sake of argument:

    • You get the first exception because distributed data structures in Spark are invariant. Don't use asInstanceOf. Just be explicit with type annotations.
    • You get the second exception because Datasets are further limited by the usage of Encoders. All object in the Dataset have to use the same Encoder what in this case is possible only with binary encoder which won't implicitly accessible for user defined class.

    With these two pieces combined:

    import org.apache.spark.sql.{Dataset, Encoders}
    
    sealed trait EdgeProperty
    
    case class AuthorEdgeProperty(val doccount: Long) extends  EdgeProperty
    case class CiteEdgeProperty() extends EdgeProperty
    
    val docauthoredges: Dataset[EdgeProperty] = spark.range(10)
      .map(AuthorEdgeProperty(_): EdgeProperty)(Encoders.kryo[EdgeProperty])
    
    val docciteedges: Dataset[EdgeProperty] = spark.range(5)
      .map(_ => CiteEdgeProperty(): EdgeProperty)(Encoders.kryo[EdgeProperty])
    
    val edges: Dataset[EdgeProperty] = docauthoredges.union(docciteedges)
    

    Convert to RDD to make it usable in GraphX:

    edges.rdd