I'm fairly new to Scala and am trying to build a Spark job. I've built ajob that contains the DataStax connector and assembled it into a fat jar. When I try to execute it it fails with a java.lang.NoSuchMethodError
. I've cracked open the JAR and can see that the DataStax library is included. Am I missing something obvious? Is there a good tutorial to look at regarding this process?
Thanks
console $ spark-submit --class org.bobbrez.CasCountJob ./target/scala-2.11/bobbrez-spark-assembly-0.0.1.jar ks tn ... Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef; at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) ...
build.sbt
name := "soofa-spark"
version := "0.0.1"
scalaVersion := "2.11.7"
// additional libraries
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M3"
libraryDependencies += "com.typesafe" % "config" % "1.3.0"
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.startsWith("META-INF") => MergeStrategy.discard
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
case "about.html" => MergeStrategy.rename
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
}
CasCountJob.scala
package org.bobbrez
// Spark
import org.apache.spark.{SparkContext, SparkConf}
import com.datastax.spark.connector._
object CasCountJob {
private val AppName = "CasCountJob"
def main(args: Array[String]) {
println("Hello world from " + AppName)
val keyspace = args(0)
val tablename = args(1)
println("Keyspace: " + keyspace)
println("Table: " + tablename)
// Configure and create a Scala Spark Context.
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "HOSTNAME")
.set("spark.cassandra.auth.username", "USERNAME")
.set("spark.cassandra.auth.password", "PASSWORD")
.setAppName(AppName)
val sc = new SparkContext(conf)
val rdd = sc.cassandraTable(keyspace, tablename)
println("Table Count: " + rdd.count)
System.exit(0)
}
}
Cassandra connector for Spark 1.6 is still in development and not released yet.
For Integrating Cassandra with Spark you need at-least following dependencies: -
The mapping of appropriate version of Cassandra Libraries and Spark are mentioned here
Apparently the Cassandra connector for Spark 1.5 is also is in development and you may see some compatibility issues. The most stable release of Cassandra connector is for Spark 1.4 which requires following Jar Files: -
Needless to mention that all these jar files should be configured and available to executors.