Search code examples
apache-sparkcassandraspark-cassandra-connector

How to get Cassandra cql string given a Apache Spark Dataframe in 2.2.0?


I am trying to get a cql string given a Dataframe. I came across this function

Where I can do something like this

TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql()

It looks to me that the library uses first column as Partition Key and does not care about Clustering Key so how do I specify to use particular set of columns of a Dataframe as a PartitionKey and ParticularSet of columns as a Clustering Key ?

Looks like I can create a new TableDef however I have to do the entire mapping by myself and in some cases the necessary functions like ColumnType are not accessible in Java. for Example I tried to create a new ColumnDef like below

new ColumnDef("col5", new PartitionKeyColumn(), ColumnType is not accessible in Java)

Objective: To get a CQL create Statement from a Spark DataFrame.

Input My dataframe can have any number of columns with their respective Spark Types. so say I have a Spark Dataframe with 100 columns where my col8, col9 of my dataframe corresponds to cassandra partitionKey columns and my column10 corresponds to cassandra clustering Key column

col1| col2| ...|col100

Now I want to use spark-cassandra-connector library to give me a CQL create table statement given the info above.

Desired Output

create table if not exists test.hello (
   col1 bigint, (whatever column1 type is from my dataframe I just picked bigint randomly)
   col2 varchar,
   col3 double,
   ...
   ...
   col100 bigint,
   primary key(col8,col9)
) WITH CLUSTERING ORDER BY (col10 DESC);

Solution

  • Because required components (PartitionKeyColumn & instances of ColumnType) are objects in Scala, you need to use following syntax to access their intances:

    // imports
    import com.datastax.spark.connector.cql.ColumnDef;
    import com.datastax.spark.connector.cql.PartitionKeyColumn$;
    import com.datastax.spark.connector.types.TextType$;
    
    // actual code
    ColumnDef a = new ColumnDef("col5",  
          PartitionKeyColumn$.MODULE$, TextType$.MODULE$);
    

    See code for ColumnRole & PrimitiveTypes to find full list of names of objects/classes.

    Update after additional requirements: Code is lengthy, but should work...

    SparkSession spark = SparkSession.builder()
                    .appName("Java Spark SQL example").getOrCreate();
    
    Set<String> partitionKeys = new TreeSet<String>() {{
                    add("col1");
                    add("col2");
            }};
    Map<String, Integer> clustereingKeys = new TreeMap<String, Integer>() {{
                    put("col8", 0);
                    put("col9", 1);
            }};
    
    Dataset<Row> df = spark.read().json("my-test-file.json");
    TableDef td = TableDef.fromDataFrame(df, "test", "hello", 
                    ProtocolVersion.NEWEST_SUPPORTED);
    
    List<ColumnDef> partKeyList = new ArrayList<ColumnDef>();
    List<ColumnDef> clusterColumnList = new ArrayList<ColumnDef>();
    List<ColumnDef> regColulmnList = new ArrayList<ColumnDef>();
    
    scala.collection.Iterator<ColumnDef> iter = td.allColumns().iterator();
    while (iter.hasNext()) {
            ColumnDef col = iter.next();
            String colName = col.columnName();
            if (partitionKeys.contains(colName)) {
                    partKeyList.add(new ColumnDef(colName, 
                                    PartitionKeyColumn$.MODULE$, col.columnType()));
            } else if (clustereingKeys.containsKey(colName)) {
                    int idx = clustereingKeys.get(colName);
                    clusterColumnList.add(new ColumnDef(colName, 
                                    new ClusteringColumn(idx), col.columnType()));
            } else {
                    regColulmnList.add(new ColumnDef(colName, 
                                    RegularColumn$.MODULE$, col.columnType()));
            }
    }
    
    TableDef newTd = new TableDef(td.keyspaceName(), td.tableName(), 
                    (scala.collection.Seq<ColumnDef>) partKeyList,
                    (scala.collection.Seq<ColumnDef>) clusterColumnList, 
                    (scala.collection.Seq<ColumnDef>) regColulmnList,
                    td.indexes(), td.isView());
    String cql = newTd.cql();
    System.out.println(cql);