Search code examples
apache-kafkacassandraapache-kafka-connect

Insert the evaluation of a function in cassandra with JSON


I am using kafka connect to sink some data from a kafka topic to a cassandra table. I want to add a column with a timestamp when the insert/update happens into cassandra. That is easy with postgres with functions and triggers. To use the same in cassandra that is not going to happen. I can not add java code to the cassandra cluster. So I am thinking on how to add this value injecting a now() at some point of the kafka connect and inserted on the cassandra table as the result of the execution of the function. I read kafka connect to cassandra uses de JSON cassandra insert api.

https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useInsertJSON.html

I tried to insert with different formats but nothing worked.

INSERT INTO keyspace1.table1 JSON '{   "lhlt" : "key 1",    "last_updated_on_cassandra" : now()   }';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Could not decode JSON string as a map: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'now': was expecting 'null', 'true', 'false' or NaN
 at [Source: {   "lhlt" : "key 1",    "last_updated_on_cassandra" : now()   }; line: 1, column: 74]. (String was: {   "lhlt" : "key 1",    "last_updated_on_cassandra" : now()   })"

Solution

  • The CQL grammar does not support the use of functions when inserting data in JSON format.

    The INSERT INTO ... JSON command only accepts valid JSON. For example:

    INSERT INTO tstamp_tbl JSON '{
      "id":1,
      "tstamp":"2022-10-29"
    }';
    

    You will need to use the more generic INSERT syntax if you want to call CQL functions. For example:

    INSERT INTO tstamp_tbl (id, tstamp)
      VALUES ( 1, toTimestamp(now()) );
    

    Cheers!