I've created Debezium connector in Kafka Connect. It works, but if I want to add any SMT, it breaks - there is different structure of message for DDL changes (for example changes in table schema) and different for changes in rows.
{
"name": "test_debezium",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": localhost",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "***",
"database.server.name": "test-debezium",
"database.include.list": "database",
"table.include.list": "database.table",
"database.history.kafka.topic": "test_debezium_history",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "localhost:9092",
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "client_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "client_id"
}
}
How to separate database.history
messages and database.table
topic, about changes in rows?
example of DDL message:
{"databaseName":"database"}-{"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":null,"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"databaseName":"database","ddl":"CREATE DATABASE IF EXISTS `database`"}
example of row
message:
{"client_id":328}-{"before":null,"after":{"test-debezium.database.table.Value":{"client_id":328,"first_name":"Ignacy","uuid":"000"}},"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":{"string":"table"},"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"op":"c","ts_ms":{"long":1611301059407},"transaction":null}
so the same extractor for Key doesn't work - how to use different extractor for different topics or how to use extractor just in row
messages?
Thanks!
You can use the Predicate
option that was added to Single Message Transform in Apache Kafka 2.6.
You need to specify your predicate, which we'll base on the topic name. Because the target topic for the table data could vary, we'll build the predicate against the fixed history topic name:
"predicates.isHistoryTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHistoryTopic.pattern": "test_debezium_history"
Then for each Single Message Transform for which we want to limit its execution we add predicate
to the transform
configuration. Since we want it to not execute for messages that do match the predicate, we add the negate
option.
The final config looks like this:
"transforms" : "createKey,extractInt",
"transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields" : "client_id",
"transforms.createKey.predicate" : "isHistoryTopic",
"transforms.createKey.negate" : "true",
"transforms.extractInt.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field" : "client_id",
"transforms.extractInt.predicate" : "isHistoryTopic",
"transforms.extractInt.negate" : "true",
"predicates" : "isHistoryTopic",
"predicates.isHistoryTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHistoryTopic.pattern": "test_debezium_history"
This blog and video go into more detail of the Predicate
option.