I am trying to read/write to Cassandra from Spark and using these dependencies:
"com.datastax.spark" % "spark-cassandra-connector-unshaded_2.11" % "2.0.0-M3",
"com.datastax.cassandra" % "cassandra-driver-core" % "3.0.0"
And this is the code :
import com.datastax.spark.connector._
val sparkConf: SparkConf = new SparkConf().setAppName(appName)
.set("spark.cassandra.connection.host", hostname)
.set("spark.cassandra.auth.username",user)
.set("spark.cassandra.auth.password",password)
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val df = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> s"$TABLE", "keyspace" -> s"$KEYSPACE"))
.load() // This Dataset will use a spark.cassandra.input.size of 128
However on trying to spark-submit, I am getting this on the df ...load() line above
Exception in thread "main" java.lang.NullPointerException
at com.datastax.driver.core.Cluster$Manager.close(Cluster.java:1516)
at com.datastax.driver.core.Cluster$Manager.access$200(Cluster.java:1237)
at com.datastax.driver.core.Cluster.closeAsync(Cluster.java:540)
at com.datastax.driver.core.Cluster.close(Cluster.java:551)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:82)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory$.forSystemLocalPartitioner(TokenFactory.scala:98)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:255)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:345)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
M3 is a milestone release and you should really use the real release 2.0.2 which is currently the latest.
https://github.com/datastax/spark-cassandra-connector#most-recent-release-scala-docs
You should not be including the java driver in the same project with the Cassandra Connector. Unless you are explicitly re-shading inside your project and that is for experts only. See the FAQ for more details.
I would recommend using only the shaded artifact and following the examples as posted here
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion % "provided"
And launching uses Spark Packages or an assembly
//Assembly
https://github.com/datastax/SparkBuildExamples#sbt
// Packages
https://spark-packages.org/package/datastax/spark-cassandra-connector