Search code examples
dockerapache-kafkaapache-kafka-connectdebeziumconfluent-platform

Debezium SQL Server Source Connector set Kafka broker


I'm trying to setup docker environment of Confluent Kafka Platform and integrate with Debezium SQL Server Source Connector.

I followed this Confluent's guide for the Kafka platform, then this Debezium tutorial for the SQL Server Source Connector.

My broker container is named broker, this container and the rest of the other containers (indclue connect container) are in the same network, and I made sure they can ping&telnet each other.

In Debezium tutorial, I stuck in the step of Start The Debezium SQL Server Connector because I'm receiving an error which indicates that the connector is trying to access Kafka broker through localhost:9092 instead of broker:9092:

[2020-03-29 10:46:30,907] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,114] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,969] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:34,127] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:35,333] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:36,238] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Until it finally timeouts:

[2020-03-29 10:46:38,664] ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 60000ms

The funny thing is that I can see that my configuration is received successfully at the start of the log (look for the broker:9092):

[2020-03-29 10:45:38,618] INFO    database.history.kafka.bootstrap.servers = broker:9092 (io.debezium.connector.common.BaseSourceTask)

...

[2020-03-29 10:45:38,655] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [broker:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = server1-dbhistory
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = server1-dbhistory
        group.instance.id = null

This is my configuration file: register-sqlserver.json:

{
 "name": "inventory-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "server1",
     "database.hostname" : "sqlserver_1",
     "database.port" : "1433",
     "database.user" : "sa",
     "database.password" : "Password!",
     "database.dbname" : "testDB",
     "database.history.kafka.bootstrap.servers" : "broker:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
     }
 }  

I add the connector via the host machine as follows (just like the guide):

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json  

The logs that I've shown are the output of connect container logs.

No other localhost words are presented in my full logs, so no worries about other configurations with default value localhost that I could've been missed as a lead.

Will appreciate any help :)


Solution

  • The problem is down to advertised listeners.

    You're connecting to the broker on 9092 which per the config is for the listener which advertises its host as localhost

    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
    

    That means that the client (Debezium, in this case) will initially connect to the bootstrap server that you give it (broker:9092) but the broker will hand back to the client the advertised host (localhost) - and the client will then try to connect to that. Since they're on separate instances, localhost for the Debeziu, connector is not the broker, and the connection fails.

    Ref: https://rmoff.net/2018/08/02/kafka-listeners-explained/

    Solution:

    Use port 29092, which per the above config is bound to the broker advertised host, which will resolve correctly from the Debezium container

    "database.history.kafka.bootstrap.servers" : "broker:29092"