Search code examples
apache-kafkaapache-kafka-connectksqldbdebezium

Is "register" a reserved keyword in Ksql and if so how can I select a field with that name


I am in the process of learning the Confluent platform (Kafka, Ksql etc). I am streaming data into a Kafka topic using Debezium with Kafka Connect. One of the fields in the my database table "log" is called "register" which is a timestamp of when the record was added.

For reference the structure of table log (in the source MySQL databases) is as follows:

CREATE TABLE `log` (
  `code` varchar(9) NOT NULL,
  `register` datetime NOT NULL,
  `entry` mediumtext NOT NULL,
  PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

I am streaming data from the "log" table in two databases into a single Kafka topic using the following configuration which is working as intended.

"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.topicRoute.replacement": "merged.$3",

I am trying to establish a KSQL stream that creates a new key which is a concatenation of the source database (from the metadata generated by Debezium) and the code field from the log table, along with the rest of the fields from the table. The purpose of this is so that the derived key is completely unique when sent to the sink (currently connecting to another MySQL database which contains a single log table whose contents should be an amalgamated copy of the two source database's log tables)

The query I am attempting to run is:

SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;

However the following error ensues:

line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Statement: SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Caused by: line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE',
        'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE',
        'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW',
        'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY',
        'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
        QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException

I can't see anywhere that suggests that "register" is some kind of reserved term.

Can anyone help? Alternative can any suggest a way of changing the field name on the way in with a transform, bearing in mind that I can't flatten the message generated by Debezium as I need to be able to get to the source database name


Solution

    1. Yes REGISTER is a reserved word, you should avoid it in your DDL. You may be able to access it by quoting it, worth trying.

    2. There is a Single Message Transform for dropping fields, but it doesn't work on nested data. What you could try is the UnwrapFromEnvelope SMT combined with one to rename the field. I've not tried this config but something like

      "transforms": "unwrap,renameField",
      "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
      "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.renameField.renames": "register:notareservedword",