I have the following connector declared with ksqldb:
CREATE
SOURCE CONNECTOR `myconn` WITH (
"name" = 'myconn',
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max" = 1,
"database.hostname" = 'myconn-db',
"database.port" = '${dbPort}',
"database.user" = '${dbUsername}',
"database.password" = '${dbPassword}',
"database.history.kafka.topic" = 'myconn_db_history',
"database.history.kafka.bootstrap.servers" = '${bootstrapServer}',
"database.server.name" = 'myconn_db',
"database.allowPublicKeyRetrieval" = '${allowPublicKeyRetrieval}',
"table.include.list" = 'myconn.links,myconn.imports',
"message.key.columns" = 'myconn.links:id',
"tombstones.on.delete" = true,
"null.handling.mode" = 'keep',
"transforms" = 'unwrap',
"transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState',
"transforms.unwrap.drop.tombstones" = false,
"transforms.unwrap.delete.handling.mode" = 'none'
);
Tombstones are successfully sent, but the key in messages is Struct(id=00000)
. In order to change the key by 00000
, I've used ExtractField$Key
transform:
CREATE
SOURCE CONNECTOR `myconn` WITH (
"name" = 'myconn',
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max...
--- I omit all the rest for convenience ---
"transforms" = 'unwrap,extractKey',
---New lines added (next 3)
"transforms.extractKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractKey.field" = 'id',
"include.schema.changes" = false
);
Just adding the last three lines, now the keys are ok but tombstones disappear; no tombstone in the topic. Do you know the reason?
As you can see I have more than one table allowed in the white list (table.include.list). The second one has a different id field; not 'id' but 'import_id'. Seems like internally that field couldn't be properly extracted and tombstones (for all tables) was ignored.
I'm not sure about what's the reason of that behavior (no errors reported by doing describe connector myconn
; something like 'key id not found' would have been useful) but I solved the issue just handling each topic with its proper key.
Here you have the new connector definition:
CREATE
SOURCE CONNECTOR `myconn` WITH (
"name" = 'myconn',
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max" = 1,
--Database config--------------------------
"database.hostname" = 'myconn-db',
"database.port" = '${dbPort}',
"database.user" = '${dbUsername}',
"database.password" = '${dbPassword}',
"database.history.kafka.topic" = 'myconn_db_history',
"database.history.kafka.bootstrap.servers" = '${bootstrapServer}',
"database.server.name" = 'myconn_db',
"database.allowPublicKeyRetrieval" = '${allowPublicKeyRetrieval}',
"table.include.list" = 'myconn.links,myconn.imports',
--Connector behavior------------------------
"tombstones.on.delete" = true,
"null.handling.mode" = 'keep',
"include.schema.changes" = false,
--Predicates--------------------------------
"predicates" = 'TopicDoestHaveIdField,IsImportTopic',
"predicates.TopicDoestHaveIdField.type" = 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches',
"predicates.TopicDoestHaveIdField.pattern" = 'myconn_db.myconn\.(imports)',
"predicates.IsImportTopic.type" = 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches',
"predicates.IsImportTopic.pattern" = 'myconn_db.myconn.imports',
--Transforms--------------------------------
"transforms" = 'unwrap,extractKey,extractImportKey',
"transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState',
"transforms.unwrap.drop.tombstones" = false,
"transforms.unwrap.delete.handling.mode" = 'none',
"transforms.extractKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractKey.field" = 'id',
"transforms.extractKey.predicate" = 'TopicDoestHaveIdField',
"transforms.extractKey.negate" = true,
"transforms.extractImportKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractImportKey.field" = 'import_id',
"transforms.extractImportKey.predicate" = 'IsImportTopic'
);
Now I have the tombstones in the topics and rows are properly removed from tables.