I am using kafka connect to sink some data from a kafka topic to a cassandra table. I want to add a column with a timestamp when the insert/update happens into cassandra. That is easy with postgres with functions and triggers. To use the same in cassandra that is not going to happen. I can not add java code to the cassandra cluster. So I am thinking on how to add this value injecting a now() at some point of the kafka connect and inserted on the cassandra table as the result of the execution of the function. I read kafka connect to cassandra uses de JSON cassandra insert api.
https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useInsertJSON.html
I tried to insert with different formats but nothing worked.
INSERT INTO keyspace1.table1 JSON '{ "lhlt" : "key 1", "last_updated_on_cassandra" : now() }';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Could not decode JSON string as a map: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'now': was expecting 'null', 'true', 'false' or NaN
at [Source: { "lhlt" : "key 1", "last_updated_on_cassandra" : now() }; line: 1, column: 74]. (String was: { "lhlt" : "key 1", "last_updated_on_cassandra" : now() })"
The CQL grammar does not support the use of functions when inserting data in JSON format.
The INSERT INTO ... JSON
command only accepts valid JSON. For example:
INSERT INTO tstamp_tbl JSON '{
"id":1,
"tstamp":"2022-10-29"
}';
You will need to use the more generic INSERT
syntax if you want to call CQL functions. For example:
INSERT INTO tstamp_tbl (id, tstamp)
VALUES ( 1, toTimestamp(now()) );
Cheers!