Search code examples
sql-serverapache-kafka-connectdebezium

Debezium SQL Server Source Connectors streams events only once per day


I am facing a problem where my Debezium SQL Server Source Connectors is not streaming CDC captures in real time but streams all events once per day at the same time 19:07:29. Below is a sample record

      "source":{
         "version":"1.8.1.Final",
         "connector":"sqlserver",
         "name":"dataroom-rds",
         "ts_ms":1655275673757,
         "snapshot":"false",
         "db":"dbDataRoom",
         "sequence":null,
         "schema":"dbo",
         "table":"tblFolder",
         "change_lsn":"0056e213:00000e55:000a",
         "commit_lsn":"0056e213:00000ecd:0014",
         "event_serial_no":1
      },
      "op":"c",
      "ts_ms":1655284049349,
      "transaction":null

You see the difference in source.ts_ms and ts_ms . source.ts_ms is 15 June 2022 16:47:53.757 but ts_ms is 15 June 2022 19:07:29.349 . The documentation mentions that it means there is a lag but doesnt give any more info.

https://debezium.io/documentation/reference/connectors/sqlserver#sqlserver-change-event-values

In the source object, ts_ms indicates the time when a change was committed in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

Below is the connector config

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mssql-connector-dbdataroom-raw
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: io.debezium.connector.sqlserver.SqlServerConnector
  tasksMax: 1
  config:
    database.server.id: "1"
    database.server.name: "dataroom-rds"
    database.hostname: "${secrets:kafka/dataroom-rds-creds:host}"
    database.port: "1433"
    database.user: "${secrets:kafka/dataroom-rds-creds:username}"
    database.password: "${secrets:kafka/dataroom-rds-creds:password}"
    database.dbname: "${secrets:kafka/dataroom-rds-creds:dbname}"
    table.include.list: "dbo.tblFolder,dbo.tblDocument,dbo.tblAllowedSecurityGroupFolder,dbo.tblAllowedSecurityGroupFolderHistory"
    snapshot.isolation.mode: "read_committed"
    snapshot.lock.timeout.ms: "-1"
    poll.interval.ms: "2000"

    database.history.kafka.bootstrap.servers: KUSTOMIZE_REPLACEMENT
    database.history.kafka.topic: "schema-changes.dbdataroom"

    database.history.producer.security.protocol: "SASL_SSL"
    database.history.producer.sasl.mechanism: "SCRAM-SHA-512"
    database.history.producer.sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username=${secrets:kafka/kafka-connect-msk-secrets:msk_sasl_user} password=${secrets:kafka/kafka-connect-msk-secrets:msk_sasl_password} ;"

    database.history.consumer.security.protocol: "SASL_SSL"
    database.history.consumer.sasl.mechanism: "SCRAM-SHA-512"
    database.history.consumer.sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username=${secrets:kafka/kafka-connect-msk-secrets:msk_sasl_user} password=${secrets:kafka/kafka-connect-msk-secrets:msk_sasl_password} ;"

    transforms: "changeTopicCase"
    # This transform changes the case of tables names to lowercase to match the topic. Table names in SQL Server
    # have uppercase characters.
    transforms.changeTopicCase.type: "com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase"
    transforms.changeTopicCase.from: "UPPER_UNDERSCORE"
    transforms.changeTopicCase.to: "LOWER_UNDERSCORE"

Solution

  • Hassan , without logs this is difficult to solve. Please check the following and post more logs

    1. Check if the CDC is enabled on the tables
    2. Query the CDC table in SQL Server , There is a default cdc schema where changes are captured.
    SELECT TOP 10 * FROM cdc.dbo_<TABLE-NAME//e.g.tblFolder>_CT -- add filters on time columns 
    
    1. The only SMT is a case change , so expect not to have any issues there

    2. Check the Kafka connect logs. From the question probably is getting deployed to k8s. Login into the POD

    curl localhost:8083/connectors
    curl localhost:8083/connectors/<connector-name-from-above>/tasks/0/status
    
    1. Do you have any other hints on the logs ?

    2. Are there issues in the SQL Server ? sp_who2 , sp_whoisactive perhaps for any clue ?

    3. Any network issues between kubernetes and the RDS SQL Server ?

    While the time correlation may be valid , these would be the ways to find any issues and narrow down what you face