Search code examples
javaapache-sparkapache-spark-sqlcassandraspark-cassandra-connector

Spark2 session for Cassandra , sql queries


In Spark-2.0 what is the best way to create a Spark session. Because in both Spark-2.0 and Cassandra- the APIs have been reworked, essentially deprecating the SqlContext (and also CassandraSqlContext). So for executing SQL- either I create a Cassandra Session (com.datastax.driver.core.Session) and use execute( " "). Or I have to create a SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText) method.

I don't know the SQL limitations of either - can someone explain.

Also if I have to create the SparkSession - how do I do it- couldn't find any suitable example. With APIs getting reworked the old examples don't work. I was going thru this code sample- DataFrames- not clear what sql context is being used here (is that the right approach going forward.) (For some reason deprecated APIs are not even compiling - need to check my eclipse settings)

Thanks


Solution

  • You would need Cassandra Session for create/drop keyspace and table from Cassandra DB. In Spark application, in order to create Cassandra Session you need to pass SparkConf to CassandraConnector. In Spark 2.0 you can do it like below.

     SparkSession spark = SparkSession
                  .builder()
                  .appName("SparkCassandraApp")
                  .config("spark.cassandra.connection.host", "localhost")
                  .config("spark.cassandra.connection.port", "9042")
                  .master("local[2]")
                  .getOrCreate();
    
    CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
    Session session = connector.openSession();
    session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");
    

    If you have existing Dataframe then you can create table in Cassandra using DataFrameFunctions.createCassandraTable(Df) as well. See api details here.

    You can read data from Cassandra DB using api provided by spark-cassandra-connector like below.

    Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
                .options(new HashMap<String, String>() {
                    {
                        put("keyspace", "mykeyspace");
                        put("table", "mytable");
                    }
                }).load();
    
    dataset.show(); 
    

    You can use SparkSession.sql() method to run query on temporary table created on Dataframe returned by spark cassandra connector like below.

    dataset.createOrReplaceTempView("usertable");
    Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
    dataset1.show();