Search code examples
apache-sparkapache-spark-sqlcassandraakkaspark-cassandra-connector

Can't get CassandraTableScanRDD with akka


I'm trying to make an API Rest with Akka. One of the entry point will return data from my Cassandra Database. So I can get my CassandraTableScanRDD like that :

val spark = SparkSession
    .builder()
    .appName("Spark Cassandra")
    .config("spark.cores.max", "5")
    .config("spark.sql.warehouse.dir", "/tmp")
    .config("spark.cassandra.connection.host", "localhost")
    .config("spark.cassandra.connection.port", "9042")
    .master("local[*]")
    .getOrCreate()

  val connector = CassandraConnector.apply(spark.sparkContext.getConf)
  val sc = spark.sparkContext
  val temp = sc.cassandraTable("scala_firemen", "firemen")
  
  temp.foreach(println)

And with this code I can get all the data I need. But once I add my Akka code I can no more print/access to the data. Even if I roll back and delete the Akka code I still code the same error which is :

[error] (run-main-0) java.lang.ExceptionInInitializerError

[error] java.lang.ExceptionInInitializerError

[error] at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

[error] at org.apache.spark.rdd.RDD.first(RDD.scala:1367)

[error] at main$.delayedEndpoint$main$1(Server.scala:34)

[error] at main$delayedInit$body.apply(Server.scala:15)

...

ERROR ContextCleaner: Error in cleaning thread

...

ERROR Utils: uncaught error in thread SparkListenerBus, stopping SparkContext

...

ERROR Utils: throw uncaught fatal error in thread SparkListenerBus

...

When I recreate a project the code works again so I suspect that I need to clean something after the deletion of the Akka code to works again.


Solution

  • Ok I figure it out that I can't have the "com.typesafe.play" %% "play-json" dependency with the all the spark dependencies like :

    "org.apache.spark" %% "spark-sql" % "2.1.1",
    "org.apache.spark" %% "spark-streaming" % "2.1.1",
    "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.10"