Search code examples
scalaapache-sparkcassandracassandra-3.0spark-cassandra-connector

Querying Cassandra data using Spark SQL in Scala


I am trying to Query Cassandra data using Spark SQL in Scala.

    import com.datastax.spark.connector._  
    import org.apache.spark.SparkContext 
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf

    val conf= new SparkConf(true)
    .set("spark.cassandra.connection.host","**.*.**.***")
    .set("spark.cassandra.auth.username","****")
    .set("spark.cassandra.auth.password","****")   
    val sc = new SparkContext(conf)

    import org.apache.spark.sql._
    val sqlContext = new SQLContext(sc)
    sqlContext.sql("SELECT * FROM energydata.demodata")  

And it throws error:

org.apache.spark.sql.AnalysisException: Table or view not found: energydata.d emodata; line 1 pos 14; 'Project [*] +- 'UnresolvedRelation energydata.demodata

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis (package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis $1.apply(CheckAnalysis.scala:82) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis $1.apply(CheckAnalysis.scala:78) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(Tre eNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(Tre eNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Ch eckAnalysis.scala:78) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scal a:91) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution .scala:52) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691) ... 54 elided

I just want to read table data without disturbing the cassandra Table. I tried this solution given here to add hive-site.xml file to spark/conf. But when i add this to spark/conf, it seems that spark is not working properly.

at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.appl
y(SparkSession.scala:938)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.appl
y(SparkSession.scala:938)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala
:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.sc
ala:938)
        at org.apache.spark.repl.Main$.createSparkSession(Main.scala:97)
        at $line3.$read$$iw$$iw.<init>(<console>:15)
        at $line3.$read$$iw.<init>(<console>:42)
        at $line3.$read.<init>(<console>:44)
        at $line3.$read$.<init>(<console>:48)
        at $line3.$read$.<clinit>(<console>)
        at $line3.$eval$.$print$lzycompute(<console>:7)
        at $line3.$eval$.$print(<console>:6)
        at $line3.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)

        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047
)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunR
eq$1.apply(IMain.scala:638)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunR
eq$1.apply(IMain.scala:637)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaCla
ssLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(Abstrac
tFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.
scala:637)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:8
07)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV
$sp(SparkILoop.scala:38)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(Spa
rkILoop.scala:37)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(Spa
rkILoop.scala:37)
        at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
        at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)

        at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILo
op.scala:920)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scal
a:909)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scal
a:909)
        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(Sca
laClassLoader.scala:97)
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
        at org.apache.spark.repl.Main$.doMain(Main.scala:70)
        at org.apache.spark.repl.Main$.main(Main.scala:53)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSub
mit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:18
0)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h
ive.ql.metadata.SessionHiveMetaStoreClient
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore
Utils.java:1523)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(Retry
ingMetaStoreClient.java:86)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret
ryingMetaStoreClient.java:132)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret
ryingMetaStoreClient.java:104)
        at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja
va:3005)
        at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:123
4)
        ... 87 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

        at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)

        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Sou
rce)
        at java.lang.reflect.Constructor.newInstance(Unknown Source)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore
Utils.java:1521)
        ... 93 more
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Rela
tive path in absolute URI: file:$%7Btest.warehouse.dir%7D
        at org.apache.hadoop.fs.Path.initialize(Path.java:205)
        at org.apache.hadoop.fs.Path.<init>(Path.java:196)
        at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:
141)
        at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:
146)
        at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:1
59)
        at org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(War
ehouse.java:177)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefau
ltDB_core(HiveMetaStore.java:600)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefau
ltDB(HiveMetaStore.java:620)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMe
taStore.java:461)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHM
SHandler.java:66)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(Retrying
HMSHandler.java:72)
        at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(
HiveMetaStore.java:5762)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaS
toreClient.java:199)
        at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(
SessionHiveMetaStoreClient.java:74)
        ... 98 more
Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:$%7B
test.warehouse.dir%7D
        at java.net.URI.checkPath(Unknown Source)
        at java.net.URI.<init>(Unknown Source)
        at org.apache.hadoop.fs.Path.initialize(Path.java:202)
        ... 111 more
17/07/26 11:40:06 WARN ObjectStore: Failed to get database default, returning No
SuchObjectException
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.
sql.hive.HiveSessionStateBuilder':
  at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$insta
ntiateSessionState(SparkSession.scala:1053)
  at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSessio
n.scala:130)
  at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSessio
n.scala:130)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scal
a:129)
  at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
  at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(Spar
kSession.scala:938)
  at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(Spar
kSession.scala:938)
  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

  at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
  at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
  at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:93
8)
  at org.apache.spark.repl.Main$.createSparkSession(Main.scala:97)
  ... 47 elided
Caused by: org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: j
ava.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metad
ata.SessionHiveMetaStoreClient;
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalo
g.scala:106)
  at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCa
talog.scala:193)
  at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(Shared
State.scala:105)
  at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala
:93)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessi
onStateBuilder.scala:39)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSe
ssionStateBuilder.scala:54)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateB
uilder.scala:52)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateB
uilder.scala:35)
  at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStat
eBuilder.scala:289)
  at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$insta
ntiateSessionState(SparkSession.scala:1050)
  ... 61 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Unable to ins
tantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
  at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

  at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala
:191)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
  at java.lang.reflect.Constructor.newInstance(Unknown Source)
  at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(Isolated
ClientLoader.scala:264)
  at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:3
62)
  at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:2
66)
  at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExterna
lCatalog.scala:66)
  at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.sc
ala:65)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly$mcZ$sp(HiveExternalCatalog.scala:194)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly(HiveExternalCatalog.scala:194)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly(HiveExternalCatalog.scala:194)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalo
g.scala:97)
  ... 70 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h
ive.ql.metadata.SessionHiveMetaStoreClient
  at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.
java:1523)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMet
aStoreClient.java:86)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingM
etaStoreClient.java:132)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingM
etaStoreClient.java:104)
  at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:300
5)
  at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
  at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)

  ... 84 more
Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumen
tException: java.net.URISyntaxException: Relative path in absolute URI: file:$%7
Btest.warehouse.dir%7D
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
  at java.lang.reflect.Constructor.newInstance(Unknown Source)
  at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.
java:1521)
  ... 90 more
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Rela
tive path in absolute URI: file:$%7Btest.warehouse.dir%7D
  at org.apache.hadoop.fs.Path.initialize(Path.java:205)
  at org.apache.hadoop.fs.Path.<init>(Path.java:196)
  at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:141)
  at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:146)
  at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:159)
  at org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(Warehouse
.java:177)
  at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB_c
ore(HiveMetaStore.java:600)
  at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(H
iveMetaStore.java:620)
  at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStor
e.java:461)
  at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandl
er.java:66)
  at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHan
dler.java:72)
  at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMe
taStore.java:5762)
  at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreCl
ient.java:199)
  at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(Sessio
nHiveMetaStoreClient.java:74)
  ... 95 more
Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:$%7B
test.warehouse.dir%7D
  at java.net.URI.checkPath(Unknown Source)
  at java.net.URI.<init>(Unknown Source)
  at org.apache.hadoop.fs.Path.initialize(Path.java:202)
  ... 108 more
<console>:14: error: not found: value spark
       import spark.implicits._
              ^
<console>:14: error: not found: value spark
       import spark.sql
              ^
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

I am using scala 2.12.2, Java 1.8.0, cassandra 3.1.1 versions. Is there any other way i can write SQL query in scala?

Thank You.


Solution

  • From the imports I understand that you're using spark-cassandra-connector. In the version compatibility section they've mentioned that the connector supports Scala 2.10, 2.11 and Cassandra 2.1.5*, 2.2, 3.0 with Spark 2.0, 2.1 with the latest version of connector.

    So I'll suggest you to downgrade the scala and cassandra versions and check if it works.

    Next, I'll suggest you to change the way youre trying to access the tables. Datastax have provided you with a different API to connect to Cassandra. You may find the relevant documentation here.

    You may do something like this with Spark 2.x,

    val spark = SparkSession.builder()
    .appName("CassTest")
    .master("local[2]")
    .config("spark.cassandra.connection.host","**.*.**.***")
    .config("spark.cassandra.auth.username","****")
    .config("spark.cassandra.auth.password","****") 
    .getOrCreate()
    
    import spark.implicits._
    
    val df = spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map( "table" -> "words", "keyspace" -> "test" ))
      .load()
    

    Finally you may do df.show
    NOTE: The hive-site.xml fix you tried is to connect Hive with some globally accessible metastore, which itself is a different data store. So, that will not work for Cassandra.

    Let me know if this helped. Cheers.