Search code examples
cassandraapache-sparkapache-spark-sqlspark-jobserverspark-cassandra-connector

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver


So I'm trying to run job that simply runs a query against cassandra using spark-sql, the job is submitted fine and the job starts fine. This code works when it is not being run through spark jobserver (when simply using spark submit). Could someone tell my what is wrong with my job code or configuration files that is causing the error below?

{
  "status": "ERROR",
  "ERROR": {
    "errorClass": "java.util.concurrent.ExecutionException",
    "cause": "Failed to open native connection to Cassandra at {127.0.1.1}:9042",
    "stack": ["com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSes
sion(CassandraConnector.scala:155)", "com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scal
a:141)", "com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:141)", "com.datastax.spark
.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)", "com.datastax.spark.connector.cql.RefCountedCache
.acquire(RefCountedCache.scala:56)", "com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
", "com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:101)", "com.datastax.spark.connecto
r.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:112)", "com.datastax.spark.connector.cql.Schema$.fromCassandra(Sch
ema.scala:243)", "org.apache.spark.sql.cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:22)", "org.apache.spark.sql.
cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:19)", "com.google.common.cache.LocalCache$LoadingValueReference.loa
dFuture(LocalCache.java:3599)", "com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)", "com.google.common.ca
che.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)", "com.google.common.cache.LocalCache$Segment.get(LocalCache.java:225
7)", "com.google.common.cache.LocalCache.get(LocalCache.java:4000)", "com.google.common.cache.LocalCache.getOrLoad(LocalCache.java
:4004)", "com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)", "org.apache.spark.sql.cassandra.Cassand
raCatalog.lookupRelation(CassandraCatalog.scala:28)", "org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.org$apache$spark
$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(CassandraSQLContext.scala:218)", "org.apache.spark.sql.catalyst.analy
sis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:161)", "org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$
anonfun$lookupRelation$3.apply(Catalog.scala:161)", "scala.Option.getOrElse(Option.scala:120)", "org.apache.spark.sql.catalyst.ana
lysis.OverrideCatalog$class.lookupRelation(Catalog.scala:161)", "org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.lookup
Relation(CassandraSQLContext.scala:218)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.sca
la:174)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:186)", "or
g.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:181)", "org.apache.spar
k.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)", "org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.appl
y(TreeNode.scala:188)", "org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)", "org.apache.spark.sql.
catalyst.trees.TreeNode.transformDown(TreeNode.scala:187)", "org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNod
e.scala:208)", "scala.collection.Iterator$$anon$11.next(Iterator.scala:328)", "scala.collection.Iterator$class.foreach(Iterator.sc
ala:727)", "scala.collection.AbstractIterator.foreach(Iterator.scala:1157)", "scala.collection.generic.Growable$class.$plus$plus$e
q(Growable.scala:48)", "scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)", "scala.collection.mutable.Arra
yBuffer.$plus$plus$eq(ArrayBuffer.scala:47)", "scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)", "scala.colle
ction.AbstractIterator.to(Iterator.scala:1157)", "scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)", "sc
ala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)", "scala.collection.TraversableOnce$class.toArray(TraversableOnce.sc
ala:252)", "scala.collection.AbstractIterator.toArray(Iterator.scala:1157)", "org.apache.spark.sql.catalyst.trees.TreeNode.transfo
rmChildrenDown(TreeNode.scala:238)", "org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)", "org.apache
.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelatio
ns$.apply(Analyzer.scala:181)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171)", "or
g.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)", "org.apache.spark.
sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)", "scala.collection.LinearSeqOptimi
zed$class.foldLeft(LinearSeqOptimized.scala:111)", "scala.collection.immutable.List.foldLeft(List.scala:84)", "org.apache.spark.sq
l.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)", "org.apache.spark.sql.catalyst.rules.RuleExecutor$$a
nonfun$apply$1.apply(RuleExecutor.scala:51)", "scala.collection.immutable.List.foreach(List.scala:318)", "org.apache.spark.sql.cat
alyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)", "org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLCon
text.scala:1082)", "org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082)", "org.apache.spark.sql.SQLCont
ext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)", "org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)", "org.apac
he.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:211)", "org.apache.spark.sql.cassandra.Cassandra
SQLContext.sql(CassandraSQLContext.scala:214)", "CassSparkTest$.runJob(CassSparkTest.scala:23)", "CassSparkTest$.runJob(CassSparkT
est.scala:9)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.sca
la:235)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$P
romiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)",
 "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)", "java.lang.Thread.run(Thread.java:745)"],
    "causingClass": "java.io.IOException",
    "message": "java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042"
  }
}

Here is the job I am running:

import org.apache.spark.{SparkContext, SparkConf}
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.sql._
import spark.jobserver._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

object CassSparkTest extends SparkJob {
        def main(args: Array[String]) {

                val sc = new SparkContext("spark://192.168.10.11:7077", "test")
                val config = ConfigFactory.parseString("")
                val results = runJob(sc, config)
                println("Results:" + results)
        }
        override def validate(sc:SparkContext, config: Config): SparkJobValidation = {
                SparkJobValid
        }

        override def runJob(sc:SparkContext, config: Config): Any = {
                val sqlC = new CassandraSQLContext(sc)
                val df = sqlC.sql(config.getString("input.sql"))
                df.collect()
        }

}

and here is my configuration file for spark-jobserver

# Template for a Spark Job Server configuration file
# When deployed these settings are loaded when job server starts
#
# Spark Cluster / Job Server configuration
spark {
  # spark.master will be passed to each job's JobContext
  master = "spark://192.168.10.11:7077"
  # master = "mesos://vm28-hulk-pub:5050"
  # master = "yarn-client"

  # Default # of CPUs for jobs to use for Spark standalone cluster
  job-number-cpus = 1

  jobserver {
    port = 2020
    jar-store-rootdir = /tmp/jobserver/jars

    jobdao = spark.jobserver.io.JobFileDAO

    filedao {
      rootdir = /tmp/spark-job-server/filedao/data
    }
  }

  # predefined Spark contexts
  # contexts {
  #   my-low-latency-context {
  #     num-cpu-cores = 1           # Number of cores to allocate.  Required.
  #     memory-per-node = 512m         # Executor memory per node, -Xmx style eg 512m, 1G, etc.
  #   }
  #   # define additional contexts here
  # }

  # universal context configuration.  These settings can be overridden, see README.md
  context-settings {
    num-cpu-cores = 1           # Number of cores to allocate.  Required.
    memory-per-node = 512m         # Executor memory per node, -Xmx style eg 512m, #1G, etc.

    # in case spark distribution should be accessed from HDFS (as opposed to being installed on every mesos slave)
    # spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz"
    spark-cassandra-connection-host="127.0.0.1"
    # uris of jars to be loaded into the classpath for this context. Uris is a string list, or a string separated by commas ','
    # dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"]

    dependent-jar-uris = ["file:///home/vagrant/lib/spark-cassandra-connector-assembly-1.3.0-M2-SNAPSHOT.jar"]

    # If you wish to pass any settings directly to the sparkConf as-is, add them here in passthrough,
    # such as hadoop connection settings that don't use the "spark." prefix
    passthrough {
      #es.nodes = "192.1.1.1"

    }
  }

  # This needs to match SPARK_HOME for cluster SparkContexts to be created successfully
  # home = "/home/spark/spark"
}

# Note that you can use this file to define settings not only for job server,
# but for your Spark jobs as well.  Spark job configuration merges with this configuration file as defaults.

Solution

  • @vicg, first you need spark.cassandra.connection.host -- periods not dashes. Also note in the error how the IP is "127.0.1.1", not the one in the config. You can also pass the IP when you create a context, like:

    curl -X POST 'localhost:8090/contexts/my-context?spark.cassandra.connection.host=127.0.0.1'

    If the above don't work, try the following PR: https://github.com/spark-jobserver/spark-jobserver/pull/164