Search code examples
javaapache-sparkcassandraspark-cassandra-connector

saving dataset to cassandra using java spark


I'm trying to save a dataset to cassandra db using java spark. I'm able to read data into dataset successfully using the below code

Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();

But when I try to write dataset I'm getting IOException: Could not load or find table, found similar tables in keyspace

Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();

I'm setting host and port in sparksession The thing is I'm able to write in overwrite and append modes but not able to create table

Versions which I'm using are below: spark java 2.0 spark cassandra connector 2.3

Tried with different jar versions but nothing worked I have also gone through different stack overflow and github links

Any help is greatly appreciated.


Solution

  • The write operation in Spark doesn't have a mode that will automatically create a table for you - there are multiple reasons for that. One of them is that you need to define a primary key for your table, otherwise, you may just overwrite data if you set incorrect primary key. Because of this, Spark Cassandra Connector provides a separate method to create a table based on your dataframe structure, but you need to provide a list of partition & clustering key columns. In Java it will look as following (full code is here):

    DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
    Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
              Arrays.asList("part")).seq());
    Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
              Arrays.asList("clust", "col2")).seq());
    CassandraConnector connector = new CassandraConnector(
              CassandraConnectorConf.apply(spark.sparkContext().getConf()));
    dfFunctions.createCassandraTable("test", "widerows6",
              partitionSeqlist, clusteringSeqlist, connector);
    

    and then you can write data as usual:

    dataset.write()
       .format("org.apache.spark.sql.cassandra")
       .options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
       .save();