Search code examples
apache-kafkadocker-composedebezium

CDC with debezium in docker


Currently I am trying to setup CDC using debezium, kafka connect and kafka in docker.

I've been following this guide: https://debezium.io/documentation/reference/tutorial.html where I skipped Starting a MsSQL database part because I have a local SQL Server database that is configured like showen in the link: https://debezium.io/documentation/reference/1.2/connectors/sqlserver.html#setting-up-sqlserver

My current docker compose file looks like this:

version: '2'
    services:
      zookeeper:
        image: "debezium/zookeeper:${DEBEZIUM_VERSION}"
        ports:
         - 2181:2181
         - 2888:2888
         - 3888:3888
         
      kafka:
        image: "debezium/kafka:${DEBEZIUM_VERSION}"
        ports:
          - 9092:9092
        links:
          - zookeeper
        environment:
          - ZOOKEEPER_CONNECT=zookeeper:2181 
    
      connect:
        image: "debezium/connect:${DEBEZIUM_VERSION}"
        ports: 
          - 8083:8083
        links:
          - kafka
          - zookeeper
        environment:
          - GROUP_ID=1
          - CONFIG_STORAGE_TOPIC=my_connect_configs
          - OFFSET_STORAGE_TOPIC=my_connect_offsets
          - STATUS_STORAGE_TOPIC=my_source_connect_statuses

and I have a environment variable in seperate file

 DEBEZIUM_VERSION=1.5

When I try to run this compose file it returns:

connect_1    | 2021-04-30 07:41:41,568 WARN   ||  [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available.   [org.apache.kafka.clients.NetworkClient]
    connect_1    | 2021-04-30 07:41:42,673 WARN   ||  [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available.   [org.apache.kafka.clients.NetworkClient]
    connect_1    | 2021-04-30 07:41:43,879 WARN   ||  [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available.   [org.apache.kafka.clients.NetworkClient]
    connect_1    | 2021-04-30 07:41:44,844 INFO   ||  [AdminClient clientId=adminclient-1] Metadata update failed   [org.apache.kafka.clients.admin.internals.AdminMetadataManager]
    connect_1    | org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1619768504843, tries=1, nextAllowedTryMs=1619768504944) timed out at 1619768504844 after 1 attempt(s)
    connect_1    | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
    connect_1    | 2021-04-30 07:41:44,845 INFO   ||  App info kafka.admin.client for adminclient-1 unregistered   [org.apache.kafka.common.utils.AppInfoParser]
    connect_1    | 2021-04-30 07:41:44,846 INFO   ||  [AdminClient clientId=adminclient-1] Metadata update failed   [org.apache.kafka.clients.admin.internals.AdminMetadataManager]
    connect_1    | org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1619768534844, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
    connect_1    | Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: fetchMetadata
    connect_1    | 2021-04-30 07:41:44,853 INFO   ||  Metrics scheduler closed   [org.apache.kafka.common.metrics.Metrics]
    connect_1    | 2021-04-30 07:41:44,853 INFO   ||  Closing reporter org.apache.kafka.common.metrics.JmxReporter   [org.apache.kafka.common.metrics.Metrics]
    connect_1    | 2021-04-30 07:41:44,853 INFO   ||  Metrics reporters closed   [org.apache.kafka.common.metrics.Metrics]
    connect_1    | 2021-04-30 07:41:44,853 ERROR  ||  Stopping due to error   [org.apache.kafka.connect.cli.ConnectDistributed]
    connect_1    | org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
    connect_1    |  at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
    connect_1    |  at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
    connect_1    |  at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:95)
    connect_1    |  at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
    connect_1    | Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1619768504842, tries=1, nextAllowedTryMs=1619768504943) timed out at 1619768504843 after 1 attempt(s)
    connect_1    |  at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    connect_1    |  at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    connect_1    |  at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    connect_1    |  at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    connect_1    |  at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
    connect_1    |  ... 3 more
    connect_1    | Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1619768504842, tries=1, nextAllowedTryMs=1619768504943) timed out at 1619768504843 after 1 attempt(s)
    connect_1    | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
    docker_connect_1 exited with code 2

EDIT:

I have ran into a new problem while trying to register sql server database. My connection string:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d "{ \"name\": \"store-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.sqlserver.SqlServerConnector\", \"database.hostname\": \"localhost\", \"database.port\": \"1433\", \"database.user\": \"sa\", \"database.password\": \"*********\", \"database.dbname\": \"DebeziumKafkaDataBase\", \"database.server.name\": \"MSSQLSERVER\", \"table.whitelist\": \"dbo.Customers\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.MSSQLSERVER\" } }"

In debezium connect container i get error:

Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The TCP/IP connection to the host localhost, port 1433 has failed. Error: "Connection refused (Connection refused). Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port. Make sure that TCP connections to the port are not blocked by a firewall.".

I've tried adding rule to firewall to allow port 1433 and setting protocols for my DB in SQL Server configuration manager like shown here: JDBC connection failed, error: TCP/IP connection to host failed

I didn't know if i should've openned new question for this so I just posted it here.


Solution

  • You should add ADVERTISED_LISTENERS=kafka:9092 to the Kafka service and BOOTSTRAP_SERVERS=kafka:9092 to the Debezium one