Search code examples
spark-streamingtitanserializable

Insert data on TitanDB using Spark (or SparkStreaming)


I am trying to add elements to TitanDB using SparkStreaming (collecting messages from a Kafka queue). But it seems that it's harder than expected. Here the definition of the Titan connection:

val confPath: String = "titan-cassandra-es-spark.properties"
val conn: TitanModule = new TitanModule(confPath) 

Titan module is a Serializable class that configure the TitanDB connection:

...
val configurationFilePath: String = confFilePath
val configuration = new PropertiesConfiguration(configurationFilePath)
val gConn: TitanGraph = TitanFactory.open(configuration)
...

When I execute the sparkStreaming job that collect messages (json) from a Kafka queue, it receive the message and trying to add it into TitanDB, it explodes with the following stackTrace.

Do you guys know if adding data into TitanDB is feasible with SparkStreaming? Do you know what could be the solution for this?

18:03:50,596 ERROR JobScheduler:95 - Error running job streaming job 1464624230000 ms.0
org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
        at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:200)
        at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:132)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration
Serialization stack:
        - object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: org.apache.commons.configuration.PropertiesConfiguration@2cef9ce8)
        - field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration)
        - object (class salvob.TitanModule, salvob.TitanModule@20d984db)
        - field (class: salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, name: conn$1, type: class salvob.TitanModule)
        - object (class salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 28 more
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
        at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:200)
        at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:132)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration
Serialization stack:
        - object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: org.apache.commons.configuration.PropertiesConfiguration@2cef9ce8)
        - field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration)
        - object (class salvob.TitanModule, salvob.TitanModule@20d984db)
        - field (class: salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, name: conn$1, type: class salvob.TitanModule)
        - object (class salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 28 more

Solution

  • Spark Streaming produces RDDs. Processing of the data inside the RDDs happens on the worker nodes. The code you write inside rdd.map(), is serialised along with the objects which are referenced inside that block and sent to the worker node for processing.

    So ideal way to use the graph instance through Spark is the following :

    streamRdd.map(kafkaTuple => {
      // create graph instance
      // use graph instance to add / modify graph
      // close graph instance
    })
    

    But this will create a new graph instance for each row. As an optimisation, you can create the graph instance per instance

    rdd.foreachPartition((rddRows: Iterator[kafkaTuple]) => {
          val graph: TitanGraph = // create titan instance
          val trans: TitanTransaction = graph.newTransaction()
    
          rddRows.foreach(graphVertex => {
            // do graph insertion in the above transaction
          })
    
          createVertexTrans.commit()
          graph.close()
    })
    

    graph.newTransaction() here helps in multi threaded graph updates. Other wise you will get lock exceptions.

    Only thing is that, according to what I have read so far, There is no direct support for multi node update. From what I saw, Titan Transaction updates HBase with a lock whenever it tries to modify a vertex. So other partitions will fail when they try to do any updates. You will have to build an external synchronisation mechanism or repartition your rdd into a single partition and then use the above code to do updates.