Before i go ahead and explain the question can anyone please tell me the difference between sparkSQL
and CassandraSQLContext
?
I am trying to run a scala code(don't want to create jar for testing purpose) on spark-cassandra cluster. So, i have the following code which does some basic query on cassandra. But every time i run the code i get the following error :
Java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition
Even though i have mentioned for the same in build.sbt. Moreover, i have tried to give the explicit path of the connector(in the scala code using sc.addJar
or using Sparkconf.Set()
) which i have created separately. Still it doesn't work.
FYI, i am using spark-1.6, cassandra-2.1.12 and scala-2.10
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra.CassandraSQLContext
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "172.16.4.196").set("spark.executor.extraClassPath", "/home/naresh/Desktop/Spark-CassandraWork/spark-cassandra-connector_1.6/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.6.0-M1.jar")
//sc.addJar("/home/naresh/Desktop/Spark-CassandraWork/spark-cassandra-connector_1.6/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.6.0-M1.jar")
val sc = new SparkContext("spark://naresh-pc:7077", "test", conf)
val csc = new CassandraSQLContext(sc)
csc.setKeyspace("cw")
val rdd = csc.sql("SOME_QUERY")
rdd.collect().foreach(a => println(a))
}
}
Build.sbt:
name := "SparkCassandraDemo"
version := "1.0"
scalaVersion := "2.11.8"
val sparkDependencies = Seq(
"org.apache.spark" %% "spark-core" % "1.6.1",
"org.apache.spark".%%("spark-sql") % "1.6.1",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M2"
)
lazy val sparkDebugger = (project in file("spark-debugger"))
.settings(
libraryDependencies ++= sparkDependencies.map(_ % "compile")
)
libraryDependencies ++= sparkDependencies.map(_ % "provided")
Error:
Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(cnt#0L ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[useragent#10], functions=[(count(if ((gid#12 = 1)) cookie#13 else null),mode=Final,isDistinct=false)], output=[cnt#0L,useragent#10])
+- TungstenExchange hashpartitioning(useragent#10,200), None
+- TungstenAggregate(key=[useragent#10], functions=[(count(if ((gid#12 = 1)) cookie#13 else null),mode=Partial,isDistinct=false)], output=[useragent#10,count#16L])
+- TungstenAggregate(key=[useragent#10,cookie#13,gid#12], functions=[], output=[useragent#10,cookie#13,gid#12])
+- TungstenExchange hashpartitioning(useragent#10,cookie#13,gid#12,200), None
+- TungstenAggregate(key=[useragent#10,cookie#13,gid#12], functions=[], output=[useragent#10,cookie#13,gid#12])
+- Expand [List(useragent#10, cookie#3, 1)], [useragent#10,cookie#13,gid#12]
+- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@18f60dc[useragent#10,cookie#3]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.ConvertToUnsafe.doExecute(rowFormatConverters.scala:38)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Sort.doExecute(Sort.scala:64)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$collect$1.apply(DataFrame.scala:1503)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$collect$1.apply(DataFrame.scala:1503)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1503)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1480)
at SimpleApp$.main(SimpleApp.scala:20)
at SimpleApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 4 times, most recent failure: Lost task 2.3 in stage 0.0 (TID 9, pratik-VirtualBox): java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:264)
at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:126)
at org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:179)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 34 more
Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
The error you are receiving is telling you that the execution context of the job lack's the Spark Cassandra Connector on the runtime Classpath.
The most common way of setting this is using --packages
spark-submit yourjar --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10
http://spark-packages.org/package/datastax/spark-cassandra-connector
Note you are also mixing scala versions as well your application build file states
scalaVersion := "2.11.8"