I'm trying to setup docker environment of Confluent Kafka Platform and integrate with Debezium SQL Server Source Connector.
I followed this Confluent's guide for the Kafka platform, then this Debezium tutorial for the SQL Server Source Connector.
My broker container is named broker
, this container and the rest of the other containers (indclue connect
container) are in the same network, and I made sure they can ping&telnet each other.
In Debezium tutorial, I stuck in the step of Start The Debezium SQL Server Connector because I'm receiving an error which indicates that the connector is trying to access Kafka broker through localhost:9092
instead of broker:9092
:
[2020-03-29 10:46:30,907] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,114] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,969] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:34,127] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:35,333] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:36,238] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Until it finally timeouts:
[2020-03-29 10:46:38,664] ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 60000ms
The funny thing is that I can see that my configuration is received successfully at the start of the log (look for the broker:9092
):
[2020-03-29 10:45:38,618] INFO database.history.kafka.bootstrap.servers = broker:9092 (io.debezium.connector.common.BaseSourceTask)
...
[2020-03-29 10:45:38,655] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [broker:9092]
check.crcs = true
client.dns.lookup = default
client.id = server1-dbhistory
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = server1-dbhistory
group.instance.id = null
This is my configuration file: register-sqlserver.json
:
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "sqlserver_1",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.dbname" : "testDB",
"database.history.kafka.bootstrap.servers" : "broker:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
I add the connector via the host machine as follows (just like the guide):
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
The logs that I've shown are the output of connect
container logs.
No other localhost
words are presented in my full logs, so no worries about other configurations with default value localhost
that I could've been missed as a lead.
Will appreciate any help :)
The problem is down to advertised listeners.
You're connecting to the broker on 9092
which per the config is for the listener which advertises its host as localhost
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
That means that the client (Debezium, in this case) will initially connect to the bootstrap server that you give it (broker:9092
) but the broker will hand back to the client the advertised host (localhost
) - and the client will then try to connect to that. Since they're on separate instances, localhost
for the Debeziu, connector is not the broker, and the connection fails.
Ref: https://rmoff.net/2018/08/02/kafka-listeners-explained/
Solution:
Use port 29092
, which per the above config is bound to the broker
advertised host, which will resolve correctly from the Debezium container
"database.history.kafka.bootstrap.servers" : "broker:29092"