Search code examples
sql-serverdockerapache-kafkaapache-kafka-connectdebezium

Debezium issue with SQL Server


I used the tutorial provided by debezium.io here and tweaked it to use a MS SQL Server database instead of MySQL but the watcher doesn't show any events or activities when I make a change in the database. Here are the steps I took:

  1. I ran the zookeeper docker command:

    docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1

  2. then I ran the kafka docker command:

    docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.1

  3. then I ran a SQL Server docker command with agent enabled:

    docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=yourStrong(!)Password' -e 'MSSQL_AGENT_ENABLED=True' --name mssql -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-latest

  4. then I connected to the SQL Server instance and created a database called PeopleDb and created a table called People by running the below query:

    USE [PeopleDb] GO

    CREATE TABLE [dbo].[People]([Id] [bigint] IDENTITY(1,1) NOT NULL,[FirstName] [varchar](50) NOT NULL, [LastName] [varchar](50) NOT NULL, CONSTRAINT [PK_People] PRIMARY KEY CLUSTERED ( [Id] ASC)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]) ON [PRIMARY] GO

  5. then I ran the below commands to enable CDC:

    Use PeopleDb Go

    EXEC sys.sp_cdc_enable_db

    EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'People', @role_name = Null, @filegroup_name = N'Primary',@supports_net_changes = 0

    EXEC sys.sp_cdc_help_change_data_capture

  6. I noticed the cdc tables were created under 'System Tables'. When I ran the below query, a record was added to the cdc.dbo_People_CT table:

    INSERT INTO TABLE dbo.People(FirstName, LastName) values ('John', 'Smith')

  7. then I ran the connector docker command:

    docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mssql:mssql debezium/connect:1.1

  8. then I deployed a connector by POSTing the below Json to http://localhost:8083/connectors/:

   {
       "name": "people-connector",
       "config": {
           "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
           "tasks.max": "1",
           "database.hostname": "mssql",
           "database.port": "1433",
           "database.user": "sa",
           "database.password": "yourStrong(!)Password",
           "database.dbname": "PeopleDb",
           "database.server.id": "184054",
           "database.server.name": "mssql",
           "database.history.kafka.bootstrap.servers": "kafka:9092",
           "database.history.kafka.topic": "mssql.dbo.people",
           "name": "people-connector"
       },
       "tasks": [],
       "type": "source"
   }
  1. I verified that the newly added connector was running by checking http://localhost:8083/connectors/people-connector/status:

    {"name":"people-connector","connector":{"state":"RUNNING","worker_id":"172.17.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.17.0.5:8083"}],"type":"source"}

  2. I ran the watcher docker command:

    docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.1 watch-topic -a -k mssql.dbo.people

which produced the below:

WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.6:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic mssql.dbo.people:
null    {
  "source" : {
    "server" : "mssql"
  },
  "position" : {
    "transaction_id" : null,
    "event_serial_no" : 1,
    "commit_lsn" : "00000025:000003f8:0003",
    "change_lsn" : "NULL"
  },
  "databaseName" : "PeopleDb",
  "schemaName" : "dbo",
  "tableChanges" : [ {
    "type" : "CREATE",
    "id" : "\"PeopleDb\".\"dbo\".\"People\"",
    "table" : {
      "defaultCharsetName" : null,
      "primaryKeyColumnNames" : [ "Id" ],
      "columns" : [ {
        "name" : "Id",
        "jdbcType" : -5,
        "typeName" : "bigint identity",
        "typeExpression" : "bigint identity",
        "charsetName" : null,
        "length" : 19,
        "scale" : 0,
        "position" : 1,
        "optional" : false,
        "autoIncremented" : false,
        "generated" : false
      }, {
        "name" : "FirstName",
        "jdbcType" : 12,
        "typeName" : "varchar",
        "typeExpression" : "varchar",
        "charsetName" : null,
        "length" : 50,
        "position" : 2,
        "optional" : false,
        "autoIncremented" : false,
        "generated" : false
      }, {
        "name" : "LastName",
        "jdbcType" : 12,
        "typeName" : "varchar",
        "typeExpression" : "varchar",
        "charsetName" : null,
        "length" : 50,
        "position" : 3,
        "optional" : false,
        "autoIncremented" : false,
        "generated" : false
      } ]
    }
  } ]
}

After completing these steps and making sure I had a running connector, I expected to see new events when inserting new records into the People table or deleting or updating them but the watcher didn't show any activities. Does anyone have any idea why there seems to be a disconnect between Debezium and SQL Server?


Solution

  • According to the Debezium SQL Server Connector docs:

    The SQL Server connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. The name of the Kafka topics always takes the form serverName.schemaName.tableName, where serverName is the logical name of the connector as specified with the database.server.name configuration property, schemaName is the name of the schema where the operation occurred, and tableName is the name of the database table on which the operation occurred.

    In your case, to view change events for the dbo.People table in the mssql database you need to watch mssql.dbo.People (please note that topic names in Kafka are case sensitive).