Search code examples
mysqlapache-kafka-connectdebezium

How to configure Debezium to use specific column as Kafka message key?


By default, Debezium uses the primary key of the table as a message key. For example, if you have a table

create table users
(
    id            bigint auto_increment primary key,
    department_id bigint
);

with data

+----+----------------+
| id | department_id  |
+----+----------------+
|  5 |              1 |
|  6 |              1 |
|  7 |              2 |
+----+----------------+

Debezium will produce the following Kafka messages:

Key: {"id": 5} Value: {"id": 5, "department_id": 1}
Key: {"id": 6} Value: {"id": 6, "department_id": 1}
Key: {"id": 7} Value: {"id": 7, "department_id": 2}

The question is how to configure Debezium to use department_id or any other column as Kafka message key?


Solution

  • There is the message.key.columns parameter for this. In your connector's configuration, you should set it like this:

    {
      "name": "my-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.whitelist": "my_database",
        ...
        "message.key.columns": "my_database.users:department_id"
      }
    }
    

    This parameter is supported by all the relational Debezium connectors.

    You can find more information here:

    https://debezium.io/blog/2019/09/26/debezium-0-10-0-cr2-released/ https://debezium.io/documentation/reference/1.0/assemblies/cdc-mysql-connector/as_deploy-the-mysql-connector.html#mysql-connector-configuration-properties_debezium