Search code examples
apache-spark-sqlcassandracassandra-3.0spark-cassandra-connector

How to insert DataSet<Row> ds into cassandra with Java API


Sample Code needed for Spark Cassandra Connector 2.11-2.0.5, Unable to Insert Dataset into cassandra db directly


Solution

    1. Add the following Cassandra dependencies

      <dependency>
          <groupId>com.datastax.spark</groupId>
          <artifactId>spark-cassandra-connector_2.10</artifactId>
          <version>1.2.1</version>
      </dependency>
      <dependency>
          <groupId>com.datastax.spark</groupId>
          <artifactId>spark-cassandra-connector-java_2.10</artifactId>
          <version>1.2.1</version>
      </dependency>
      
    2. set your Cassandra database config details to Spark

      SparkConf sparkConf = new SparkConf();
      sparkConf.setAppName("Spark-Cassandra Integration");
      sparkConf.setMaster("local[4]");
      sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
      sparkConf.set("spark.cassandra.connection.native.port", "9042");
      sparkConf.set("spark.cassandra.connection.rpc.port", "9160");
      sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
      sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
      sparkConf.set("spark.cassandra.auth.username", "test_user");
      sparkConf.set("spark.cassandra.auth.password", "test_password");
      
    3. Creating Spark Context

      JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
      
    4. Once you have your RDD, save it to Cassandra database by providing keyspace name & table name

      CassandraJavaUtil.javaFunctions(userRDD).writerBuilder(keySpaceName, tableName, CassandraJavaUtil.mapToRow(UserBean.class)).saveToCassandra();
      

    For more details you can check my blog post Save data to Cassandra tables using Apache Spark