Search code examples
apache-sparkdataframeserializationgremlintinkerpop3

Add Vertices to a Tinkerpop graph (gremlin) by applying a function to a Dataframe


As the question indicates I have been trying for quite some time to code something that could read through a data frame and add vertices to a gremlin graph with properties extracted from the dataframe. For that purpose I wrote the following code:

val graph = TinkerGraph.open()

val g = graph.traversal

def myFunction(field1:String,field2:String) ={

    graph.addVertex(field1,field2)
 }


val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2)
).toDF("id","value","group","ts")


df.map(row => myFunction("id1", row.getAs[String]("value")))

The problem is I keep getting the same error:

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2085)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.map(RDD.scala:323)
    at org.apache.spark.sql.DataFrame.map(DataFrame.scala:1425)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57

I have read some questions/answers here and I came to the conclusion that the problem is that my function is getting "something" (graph and g) that is not serializable and, therefore, it fails.

What can I do to avoid this error? I have tried creating an object, defining my function inside (as you can see below) and calling the function outside with test1.myFunction but it still didn't work.

object test1  {

    val graph = TinkerGraph.open()

    val g = graph.traversal

def myFunction(field1:String,field2:String) ={

    graph.addVertex(field1,field2)
}

}

Solution

  • Base on discussion in comments. Here is a working example for Datastax Graph TinkerPop implementation. Here is a sketch of the code base on TinkerPop driver documentation. Each spark partition is handled on one of the remote executors. Thus you should connect inside foreachPartition call to remote TP server and send data to it.

    df.foreachPartition(rows => {
         Cluster cluster = Cluster.open(); 
         Client client = cluster.connect();
          for (row <- rows) {
              val params = Map ("field1" -> "id1",
               "field2", row.getAs[String]("value"))
              client.submit("graph.addVertex(field1,field2)", params.asJava).all()
          }
          cluster.close()
    })