Search code examples
apache-sparkcassandraapache-spark-sqlcqlspark-cassandra-connector

Spark Cassandra Connector behaves different from CQL shell - Insert issue


I have the following Cassandra DM:

  CREATE TABLE table (
    id uuid,
    timestamp timestamp STATIC,
    value1 text STATIC,
    value2 int,
    value3 text,
    data map <text,text>,
    PRIMARY KEY ( id, value2 )
);

So far so good. Now I have have to types of formats i want to insert first:

INSERT INTO table (id, timestamp, value1)
VALUES (<uuid>,<timestamp>,<some-string>);

and

INSERT INTO table (id, value2, value3, data)
VALUES (<some-id>,<some-int>,<some-string>, <some-simple-json-map>)

Both CQL statements work fine. Now I am developing my spark streaming app getting data and formatting it to dataframes styled like my CQL statements. I get for each set of statement 1 several sets of statement 2.

Now I have the issue when I save my dataframe like the first CQL statement spark throws a cassandra exception:

    java.lang.IllegalArgumentException: 

        Some primary key columns are missing in RDD or have not been selected: value2
            at com.datastax.spark.connector.writer.TableWriter$.checkMissingPrimaryKeyColumns(TableWriter.scala:190)
            at com.datastax.spark.connector.writer.TableWriter$.checkColumns(TableWriter.scala:257)
            at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
            at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
           ...

Here the schema of my dataframe:

 root
  |-- id: string (nullable = true)
  |-- timestamp: long (nullable = true)
  |-- value1: string (nullable = true)

My save statment:

 data.select("id", "timeStamp", "value1")
    .write.format("org.apache.spark.sql.cassandra")
      .mode(SaveMode.Append)
      .options(Map("keyspace" -> "some_keyspace","table" -> "table"))
      .save()

When i add the value 2 column to my dataframe the statement works fine.

root
 |-- id: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- value1: string (nullable = true)
 |-- value2: integer (nullable = false)

Is there a way making it work like CQL statement without changeing the Datamodel?


Solution

  • A static column is a special column that is shared by all the rows of the same partition. In the first insert example, CQL allows you to insert static only data for the partition:

    insert into ctest (id, timestamp, value1) VALUES (233177ff-439b-4a2d-a8b0-4db742b4bc1b, '2013-01-01 00:05+0000', 'one');

    select * from ctest;

     id                                   | value2 | timestamp                | value1 | data | value3
    --------------------------------------+--------+--------------------------+--------+------+--------
     233177ff-439b-4a2d-a8b0-4db742b4bc1b |   null | 2013-01-01 00:05:00+0000 |    one | null |   null
    

    This creates a synthetic row; it looks like a row, but it is just a partition key and a static column. If you insert an real row having a primary key, the null value will disappear:

    insert into ctest (id, value2) VALUES (233177ff-439b-4a2d-a8b0-4db742b4bc1b, 1);

    select * from ctest;

     id                                   | value2 | timestamp                | value1 | data | value3
    --------------------------------------+--------+--------------------------+--------+------+--------
     233177ff-439b-4a2d-a8b0-4db742b4bc1b |      1 | 2013-01-01 00:05:00+0000 |    one | null |   null
    
    (1 rows)
    

    Having performed two inserts you only have one row.

    The Spark-Cassandra connector is more strict and is not allowing you to insert static only data for a partition. The connector code checks if all of the columns in the primary key are defined. It finds two columns in the primary key ( id, value2 ), only one is set and it raises an error. Below is the check in com.datastax.spark.connector.writer.TableWriter:

     private def checkMissingPrimaryKeyColumns(table: TableDef, columnNames: Seq[String]) {
        val primaryKeyColumnNames = table.primaryKey.map(_.columnName)
        val missingPrimaryKeyColumns = primaryKeyColumnNames.toSet -- columnNames
        if (missingPrimaryKeyColumns.nonEmpty)
          throw new IllegalArgumentException(
            s"Some primary key columns are missing in RDD or have not been selected: ${missingPrimaryKeyColumns.mkString(", ")}")
      }
    

    For a little more background on this, the book Learning Apache Cassandra By Mat Brown discusses static-only inserts on pg 53-54.