Search code examples
scalacassandraapache-spark-sqlspark-cassandra-connector

Using SBT to build scala app - java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra


I am trying to build my first spark & cassandra app using sbt.

here is the code from .scala file .

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.datastax.spark.connector._,org.apache.spark.SparkContext,org.apache.spark.SparkContext._, org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import com.datastax.driver.core.utils.UUIDs
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.cassandra
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.rdd.ReadConf
object SimpleApp {


def main(args: Array[String]) {
//val logFile = "/home/goutham/derby.log" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
//val logData = sc.textFile(logFile, 2).cache()
//val numAs = logData.filter(line => line.contains("a")).count()
//val numBs = logData.filter(line => line.contains("b")).count()
//println(s"Lines with a: $numAs, Lines with b: $numBs")


val timeUUID = udf(() => UUIDs.timeBased().toString)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlcontext.read.format("com.databricks.spark.csv").option("wholeFile", "true").option("header", "true").option("parserLib", "UNIVOCITY").option("quote","\"").option("inferSchema", "true").option("escape","\"").option("quoteMode","ALL").load("/home/goutham/Work/data/user.csv").withColumn("user_uuid", timeUUID())

df.createOrReplaceTempView("source_user")


val num = df.count()

println(s" Number of records to be proccessed in the file is  $num")



sqlcontext.sql("""CREATE TEMPORARY VIEW Dest_user
 |USING org.apache.spark.sql.cassandra
 |OPTIONS (
 |  table "t_user",
 |  keyspace "ks_payu",
 |  cluster "Test Cluster",
 |  pushdown "true"
 |)""".stripMargin)`




val df_oldrecordsUpdate = sqlcontext.sql("""Select dest.user_uuid,
 dest.user_id,
 dest.account_manager_id,
 dest.address,
 dest.address_city,
 dest.address_line_2,
 dest.address_line_3,
 dest.affiliate,
 dest.api_key,
 dest.api_login,
 dest.api_version,
 dest.bcash_account,
 dest.bcash_consumer_key,
 dest.bcash_customer_id,
 dest.bcash_email,
 dest.bcash_token,
 dest.valid_from_date,
 current_timestamp() valid_to_date, 
 0 active_flag from source_user  source inner join Dest_user dest on           source.usuario_id=dest.user_id""")

following is the .sbt file used

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0"

libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2"

error

    Number of records to be proccessed in the file is  10
17/04/12 16:24:08 INFO SparkSqlParser: Parsing command: CREATE TEMPORARY VIEW Dest_user
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "t_user",
     keyspace "ks_payu",
     cluster "Test Cluster",
     pushdown "true")
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
    at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
    at org.apache.spark.sql.execution.datasources.CreateTempViewUsing.run(ddl.scala:82)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
    at SimpleApp$.main(simpleApp.scala:61)
    at SimpleApp.main(simpleApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
    ... 31 more

**error -2 **

java.lang.NoClassDefFoundError: scala/runtime/AbstractPartialFunction$mcJL$sp
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: scala

Solution

  • You are providing wrong cassandra connector. You are using scala 2.11 and using connector 2.10. Try with:

    spark-submit --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 --class "SimpleApp" --master local[4] target/scala-2.11/simple-project_2.11-1.0.jar