I have the following Cassandra DM:
CREATE TABLE table (
id uuid,
timestamp timestamp STATIC,
value1 text STATIC,
value2 int,
value3 text,
data map <text,text>,
PRIMARY KEY ( id, value2 )
);
So far so good. Now I have have to types of formats i want to insert first:
INSERT INTO table (id, timestamp, value1)
VALUES (<uuid>,<timestamp>,<some-string>);
and
INSERT INTO table (id, value2, value3, data)
VALUES (<some-id>,<some-int>,<some-string>, <some-simple-json-map>)
Both CQL statements work fine. Now I am developing my spark streaming app getting data and formatting it to dataframes styled like my CQL statements. I get for each set of statement 1 several sets of statement 2.
Now I have the issue when I save my dataframe like the first CQL statement spark throws a cassandra exception:
java.lang.IllegalArgumentException:
Some primary key columns are missing in RDD or have not been selected: value2
at com.datastax.spark.connector.writer.TableWriter$.checkMissingPrimaryKeyColumns(TableWriter.scala:190)
at com.datastax.spark.connector.writer.TableWriter$.checkColumns(TableWriter.scala:257)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
...
Here the schema of my dataframe:
root
|-- id: string (nullable = true)
|-- timestamp: long (nullable = true)
|-- value1: string (nullable = true)
My save statment:
data.select("id", "timeStamp", "value1")
.write.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "some_keyspace","table" -> "table"))
.save()
When i add the value 2 column to my dataframe the statement works fine.
root
|-- id: string (nullable = true)
|-- timestamp: long (nullable = true)
|-- value1: string (nullable = true)
|-- value2: integer (nullable = false)
Is there a way making it work like CQL statement without changeing the Datamodel?
A static column is a special column that is shared by all the rows of the same partition. In the first insert example, CQL allows you to insert static only data for the partition:
insert into ctest (id, timestamp, value1) VALUES (233177ff-439b-4a2d-a8b0-4db742b4bc1b, '2013-01-01 00:05+0000', 'one');
select * from ctest;
id | value2 | timestamp | value1 | data | value3
--------------------------------------+--------+--------------------------+--------+------+--------
233177ff-439b-4a2d-a8b0-4db742b4bc1b | null | 2013-01-01 00:05:00+0000 | one | null | null
This creates a synthetic row; it looks like a row, but it is just a partition key and a static column. If you insert an real row having a primary key, the null value will disappear:
insert into ctest (id, value2) VALUES (233177ff-439b-4a2d-a8b0-4db742b4bc1b, 1);
select * from ctest;
id | value2 | timestamp | value1 | data | value3
--------------------------------------+--------+--------------------------+--------+------+--------
233177ff-439b-4a2d-a8b0-4db742b4bc1b | 1 | 2013-01-01 00:05:00+0000 | one | null | null
(1 rows)
Having performed two inserts you only have one row.
The Spark-Cassandra connector is more strict and is not allowing you to insert static only data for a partition. The connector code checks if all of the columns in the primary key are defined. It finds two columns in the primary key ( id, value2 ), only one is set and it raises an error. Below is the check in com.datastax.spark.connector.writer.TableWriter:
private def checkMissingPrimaryKeyColumns(table: TableDef, columnNames: Seq[String]) {
val primaryKeyColumnNames = table.primaryKey.map(_.columnName)
val missingPrimaryKeyColumns = primaryKeyColumnNames.toSet -- columnNames
if (missingPrimaryKeyColumns.nonEmpty)
throw new IllegalArgumentException(
s"Some primary key columns are missing in RDD or have not been selected: ${missingPrimaryKeyColumns.mkString(", ")}")
}
For a little more background on this, the book Learning Apache Cassandra By Mat Brown discusses static-only inserts on pg 53-54.