Search code examples
apache-kafkaapache-kafka-connectdebezium

How to separate database.history messages and messages regarding rows in Debezium connector?


I've created Debezium connector in Kafka Connect. It works, but if I want to add any SMT, it breaks - there is different structure of message for DDL changes (for example changes in table schema) and different for changes in rows.

{
    "name": "test_debezium",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": localhost",
        "database.port": "3306",
        "database.user": "kafkaconnect",
        "database.password": "***",
        "database.server.name": "test-debezium",
        "database.include.list": "database",
        "table.include.list": "database.table",
        "database.history.kafka.topic": "test_debezium_history",
        "database.history​.kafka.recovery​.poll.interval.ms": 5000,
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "transforms": "createKey,extractInt",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "client_id",
        "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractInt.field": "client_id"
    }
}

How to separate database.history messages and database.table topic, about changes in rows?

example of DDL message:

{"databaseName":"database"}-{"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":null,"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"databaseName":"database","ddl":"CREATE DATABASE IF EXISTS `database`"}

example of row message:

{"client_id":328}-{"before":null,"after":{"test-debezium.database.table.Value":{"client_id":328,"first_name":"Ignacy","uuid":"000"}},"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":{"string":"table"},"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"op":"c","ts_ms":{"long":1611301059407},"transaction":null}

so the same extractor for Key doesn't work - how to use different extractor for different topics or how to use extractor just in row messages?

Thanks!


Solution

  • You can use the Predicate option that was added to Single Message Transform in Apache Kafka 2.6.

    You need to specify your predicate, which we'll base on the topic name. Because the target topic for the table data could vary, we'll build the predicate against the fixed history topic name:

    "predicates.isHistoryTopic.type"   : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.isHistoryTopic.pattern": "test_debezium_history"
    

    Then for each Single Message Transform for which we want to limit its execution we add predicate to the transform configuration. Since we want it to not execute for messages that do match the predicate, we add the negate option.

    The final config looks like this:

    "transforms"                       : "createKey,extractInt",
    "transforms.createKey.type"        : "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields"      : "client_id",
    "transforms.createKey.predicate"   : "isHistoryTopic",
    "transforms.createKey.negate"      : "true",
    
    "transforms.extractInt.type"       : "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field"      : "client_id",
    "transforms.extractInt.predicate"  : "isHistoryTopic",
    "transforms.extractInt.negate"     : "true",
    
    "predicates"                       : "isHistoryTopic",
    "predicates.isHistoryTopic.type"   : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.isHistoryTopic.pattern": "test_debezium_history"
    

    This blog and video go into more detail of the Predicate option.