Search code examples
scalaapache-sparkcassandraspark-cassandra-connector

How do I create a table using the Spark Cassandra Connector?


I've recently begun using the Spark Cassandra Connector and I've manually created my table and been able to save data. Here's a simplified snippet from the docs:

CREATE TABLE test.words (word text PRIMARY KEY, count int);
val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

Is there a way to create tables programmatically by inferring the schema from case classes without actually writing the raw queries?


Solution

  • Yes, you can do this with saveAsCassandraTable and saveAsCassandraTableEx as described in documentation. The first function will create table automatically, based on your data (be careful that it will take one column as a partition key). The second function will allow your to customize schema by specifying partition key, clustering columns, etc., like this (code is from documentation):

    val p1Col = new ColumnDef("col1",PartitionKeyColumn,UUIDType)
    val c1Col = new ColumnDef("col2",ClusteringColumn(0),UUIDType)
    val c2Col = new ColumnDef("col3",ClusteringColumn(1),DoubleType)
    val rCol = new ColumnDef("col4",RegularColumn,IntType)
    
    // Create table definition
    val table = TableDef("test","words",Seq(p1Col),Seq(c1Col, c2Col),Seq(rCol))
    
    // Map rdd into custom data structure and create table
    val rddOut = rdd.map(s => outData(s._1, s._2(0), s._2(1), s._3))
    rddOut.saveAsCassandraTableEx(table, SomeColumns("col1", "col2", "col3", "col4"))