Search code examples
javascalacassandraapache-sparkshark-sql

Datastax DSE Cassandra, Spark, Shark, Standalone Programm


I use Datastax Enterprise 4.5. I hope I did the config right, I did it like on datastax website explained. I can write into the Cassandra DB with an Windowsservice, this works but i can't query with Spark using the where function.

I start the Cassandra node (there is only one for test purpose) with "./dse cassandra -k -t" (in the /bin folder) so hadoop and spark are running both. I can write into Cassandra without a problem.

So you cannot use a 'where' clause in a Cassandra query when the 'where' isn't the RowKey. So I need to use Spark/Shark. I can start and use all queries I need with shark (./dse shark) but I need to write a Standalone program in Scala or Java.

So I tried this link: https://github.com/datastax/spark-cassandra-connector

And I can query a simple statement like:

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", "MY_IP")
  .setMaster("spark://MY_IP:7077")
  .setAppName("SparkTest")

// Connect to the Spark cluster:
lazy val sc = new SparkContext(conf)

val rdd = sc.cassandraTable("keyspace", "tablename")
println(rdd.first)

and this works well but if I ask for more line or count:

println(rdd.count)
rdd.toArray.foreach(println)

then I get this exception:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

When I try this in Java I have the same problem. Does anyone know this problem? I dont know if the DB config is correct or if the scala/Javaprogram works correct. Maybe some Ports a blocked but 7077 and 4040 are open.

Sidenote: If I start spark on the Cassandra DB, I can do queries like:

sc.cassandraTable("test","words").select("word").toArray.foreach(println) 

But if I use a "where" clause like:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println)

I get this exception:

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING

Do you have an Idea why? I thought I can use where clauses in spark?

Thank you!


Solution

  • So far this is my solution. It is not the answer to all of my questions but it works for me and I wanna share it to you.

    I use the hive jdbc driver to access a SharkServer with Java. How It works:

    Start sharkserver: bin/dse shark --service sharkserver -p <port>

    Dependencies for Maven:

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>0.13.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>0.20.2</version>
    </dependency>
    

    Java Code:

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    public class HiveJdbcClient {
      private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
    
      public static void main(String[] args) throws SQLException {
        try {
          Class.forName(driverName);
        } catch (ClassNotFoundException e) {
          e.printStackTrace();
          System.exit(1);
        }
        Connection con = DriverManager.getConnection("jdbc:hive://YOUR_IP:YOUR_PORT/default", "", "");
        Statement stmt = con.createStatement();
        String sql;
        ResultSet res;
    
    
    
        sql = "SELECT * FROM keyspace.colFam WHERE name = 'John'";
        res = stmt.executeQuery(sql);
        while (res.next()) {
            System.out.println(res.getString("name"));
       }
     }
    }