Search code examples
scalacassandraapache-sparkdatastaxspark-cassandra-connector

Set Cassandra Clustering Order on TableDef with Datastax's Spark Cassandra Connector


Every time I try to create a new table in cassandra with a new TableDef I end up with a clustering order of ascending and I'm trying to get descending.

I'm using Cassandra 2.1.10, Spark 1.5.1, and Datastax Spark Cassandra Connector 1.5.0-M2.

I'm creating a new TableDef

val table = TableDef("so", "example", 
  Seq(ColumnDef("parkey", PartitionKeyColumn, TextType)),
  Seq(ColumnDef("ts", ClusteringColumn(0), TimestampType)),
  Seq(ColumnDef("name", RegularColumn, TextType)))

rdd.saveAsCassandraTableEx(table, SomeColumns("key", "time", "name"))

What I'm expecting to see in Cassandra is

CREATE TABLE so.example (
    parkey text,
    ts timestamp,
    name text,
    PRIMARY KEY ((parkey), ts)
) WITH CLUSTERING ORDER BY (ts DESC);

What I end up with is

CREATE TABLE so.example (
    parkey text,
    ts timestamp,
    name text,
    PRIMARY KEY ((parkey), ts)
) WITH CLUSTERING ORDER BY (ts ASC);

How can I force it to set the clustering order to descending?


Solution

  • I was not able to find a direct way of doing this. Additionally there are a lot of other options you may want to specify. I ended up extending ColumnDef and TableDef and overriding the cql method in TableDef. An example of the solution I came up with is below. If someone has a better way or this becomes natively supported I'd be happy to change the answer.

    // Scala Enum
    object ClusteringOrder {
      abstract sealed class Order(val ordinal: Int) extends Ordered[Order]
        with Serializable {
        def compare(that: Order) = that.ordinal compare this.ordinal
    
        def toInt: Int = this.ordinal
      }
    
      case object Ascending extends Order(0)
      case object Descending extends Order(1)
    
      def fromInt(i: Int): Order = values.find(_.ordinal == i).get
    
      val values = Set(Ascending, Descending)
    }
    
    // extend the ColumnDef case class to add enum support
    class ColumnDefEx(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_],
      indexed: Boolean = false, val clusteringOrder: ClusteringOrder.Order = ClusteringOrder.Ascending)
      extends ColumnDef(columnName, columnRole, columnType, indexed)
    
    // Mimic the ColumnDef object
    object ColumnDefEx {
      def apply(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_],
        indexed: Boolean, clusteringOrder: ClusteringOrder.Order): ColumnDef = {
        new ColumnDefEx(columnName, columnRole, columnType, indexed, clusteringOrder)
      }
    
      def apply(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_],
        clusteringOrder: ClusteringOrder.Order = ClusteringOrder.Ascending): ColumnDef = {
        new ColumnDefEx(columnName, columnRole, columnType, false, clusteringOrder)
      }
    
      // copied from ColumnDef object
      def apply(column: ColumnMetadata, columnRole: ColumnRole): ColumnDef = {
        val columnType = ColumnType.fromDriverType(column.getType)
        new ColumnDefEx(column.getName, columnRole, columnType, column.getIndex != null)
      }
    }
    
    // extend the TableDef case class to override the cql method
    class TableDefEx(keyspaceName: String, tableName: String, partitionKey: Seq[ColumnDef],
      clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], options: String)
      extends TableDef(keyspaceName, tableName, partitionKey, clusteringColumns, regularColumns) {
    
      override def cql = {
        val stmt = super.cql
        val ordered = if (clusteringColumns.size > 0)
          s"$stmt\r\nWITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns)})"
        else stmt
        appendOptions(ordered, options)
      }
    
      private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef]): String =
        clusteringColumns.map { col =>
          col match {
            case c: ColumnDefEx => if (c.clusteringOrder == ClusteringOrder.Descending)
              s"${c.columnName} DESC" else s"${c.columnName} ASC"
            case c: ColumnDef => s"${c.columnName} ASC"
          }
        }.toList.mkString(", ")
    
      private[this] def appendOptions(stmt: String, opts: String) =
        if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt\r\nAND ${opts.substring(4)}"
        else if (!stmt.contains("WITH") && opts.startsWith("AND")) s"WITH ${opts.substring(3)}"
        else s"$stmt\r\n$opts"
    }
    
    // Mimic the TableDef object but return new TableDefEx
    object TableDefEx {
      def apply(keyspaceName: String, tableName: String, partitionKey: Seq[ColumnDef],
        clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], options: String = "") =
        new TableDefEx(keyspaceName, tableName, partitionKey, clusteringColumns, regularColumns,
          options)
    
      def fromType[T: ColumnMapper](keyspaceName: String, tableName: String): TableDef =
        implicitly[ColumnMapper[T]].newTable(keyspaceName, tableName)
    }
    

    This allowed me to create new tables in this manner:

    val table = TableDefEx("so", "example", 
      Seq(ColumnDef("parkey", PartitionKeyColumn, TextType)),
      Seq(ColumnDefEx("ts", ClusteringColumn(0), TimestampType, ClusteringOrder.Descending)),
      Seq(ColumnDef("name", RegularColumn, TextType)))
    
    rdd.saveAsCassandraTableEx(table, SomeColumns("key", "time", "name"))