Search code examples
mysqlapache-kafkadocker-composeapache-kafka-connectdebezium

How do I run debezium with Confluent platform?


I have an EC2 machine with mysql DB (not containerized). Now I want to stream CDC events to Confluent-managed Kafka. I am using this docker compose file to start the connect platform on the same host.

services:
  connect:
    image: confluentinc/cp-kafka-connect:latest
    network_mode: host
    env_file:
      - .env
    ports:
      - "8083:8083"
    command:
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.2
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run & 
        #
        sleep infinity
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "${KAFKA_BOOTSTRAP_SERVER}"
      CONNECT_TOPIC_CREATION_ENABLE: "false"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SASL_USERNAME: "${KAFKA_API_KEY}"
      CONNECT_SASL_PASSWORD: "${KAFKA_API_SECRET}"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: "500"

      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"

      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"

      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_CONFIG_STORAGE_TOPIC: "${CONFIG_STORAGE_TOPIC}"
      CONNECT_OFFSET_STORAGE_TOPIC: "${OFFSET_STORAGE_TOPIC}"
      CONNECT_STATUS_STORAGE_TOPIC: "${STATUS_STORAGE_TOPIC}"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_CONFIG_PROVIDERS: 'file'
      CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
    volumes:
      - .env:/data/credentials.properties

Now this part seems to work. I'm not getting any errors here. The meta topics that I created for config, offset and status are populated once I run this.

Now I am creating CDC connector using an API call and this config:

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "debezium",
    "database.server.id": 1,
    "database.include.list": "mydb",
    "snapshot.mode": "schema_only",
    "topic.prefix": "analytics01",
    "table.include.list": "mydb.table1,mydb.table2",
    "schema.history.internal.kafka.bootstrap.servers": "${file:/data/credentials.properties:KAFKA_BOOTSTRAP_SERVER}",
    "schema.history.internal.kafka.topic": "${file:/data/credentials.properties:SERVER_NAME}.schemaChanges",
    "schema.history.consumer.security.protocol": "SASL_SSL",
    "schema.history.consumer.ssl.endpoint.identification.algorithm": "https",
    "schema.history.consumer.sasl.mechanism": "PLAIN",
    "schema.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:KAFKA_API_KEY}\" password=\"${file:/data/credentials.properties:KAFKA_API_SECRET}\";",
    "schema.history.producer.security.protocol": "SASL_SSL",
    "schema.history.producer.ssl.endpoint.identification.algorithm": "https",
    "schema.history.producer.sasl.mechanism": "PLAIN",
    "schema.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:KAFKA_API_KEY}\" password=\"${file:/data/credentials.properties:KAFKA_API_SECRET}\";",
    "decimal.handling.mode":"double",
    "transforms": "InsertField",
    "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertField.static.field": "licenseId",
    "transforms.InsertField.static.value": "${file:/data/credentials.properties:LICENSE_ID}"
  }
}

The CDC connector fails:

{"name":"mysql-connector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata\n"}],"type":"source"}

I tried to run everything in debug mode and search for some usable output but couldn't find any. Do you guys have an idea what I am doing wrong?

I tried different configurations and looked at guidelines like

https://docs.confluent.io/cloud/current/cp-component/connect-cloud-config.html

or

https://rmoff.net/2019/10/16/using-kafka-connect-and-debezium-with-confluent-cloud/

I also tried to hard-code the credentials in case there was something wrong with the credentials or env import, but that wasn't it.


Update 1

I forget to highlight that I am using an .env file for the docker compose script to inject the credentials. This file is next to the docker-compose.yml and contains the following lines

KAFKA_API_KEY=xxx
KAFKA_API_SECRET=xxx
KAFKA_BOOTSTRAP_SERVER=xxx.eu-central-1.aws.confluent.cloud:9092
LICENSE_ID=xxx
CONFIG_STORAGE_TOPIC=xxx-config
OFFSET_STORAGE_TOPIC=xxx-offset
STATUS_STORAGE_TOPIC=xxx-status
SERVER_NAME=analytics01

Similarly, I am mounting the same file into the connect container and reference the shown credentials in the connector config with e.g. ${file:/data/credentials.properties:KAFKA_API_KEY}


Update 2

I already stated above why a connection / bad credential issue can be ruled out. The CONNECT PLATFORM is able to connect to the kafka cluster with the above configuration. To fortify this statement I deliberately used bad credentials and started the container. The connect platform itself checks the connection on startup. With e.g. bad API key or password you'll get this error:

connect-1  | Installing connector plugins
connect-1  | Launching Kafka Connect worker
connect-1  | ===> User
connect-1  | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
connect-1  | ===> Configuring ...
connect-1  | ===> Running preflight checks ... 
connect-1  | ===> Check if Kafka is healthy ...
connect-1  | log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.NetworkClient).
connect-1  | log4j:WARN Please initialize the log4j system properly.
connect-1  | log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
connect-1  | Error while getting broker list.
connect-1  | java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed
connect-1  |    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
connect-1  |    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
connect-1  |    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
connect-1  |    at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
connect-1  |    at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
connect-1  | Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed

With the credentials I use, I get:

Launching Kafka Connect worker
===> User
uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
===> Configuring ...
===> Running preflight checks ... 
===> Check if Kafka is healthy ...
Using log4j config /etc/cp-base-new/log4j.properties
===> Launching ... 
===> Launching kafka-connect ... 

That being said, I focus back on the other comments regarding the docker network and connector setup.


Solution

  • I finally managed to solve the issue. In the end I got lost between different versions of debezium, connect and outdated configuration keys.

    1. I ended up using the connect image provided by debezium, since this image already includes the connectors and some base configuration, which is more convenient to use than building my own image or installing the connector on the fly with docker compose.

    2. I removed the host networking mode and replaced it with an extra network.

    services:
      connect:
        image: debezium/connect:latest
        env_file:
          - .env
        container_name: connect
        extra_hosts:
          - "dbhost:${HOST_PRIVATE_IP}"
    

    That way I can just work with env variables and cover my 2 use cases. Firstly, for testing I want to run the connector on the same EC2 machine as the DB (and the DB is not containerized). Just using the private IP of the EC2 machine here and using dbhost instead of localhost works for setting up the DB connection from inside the container. Secondly, I can easily deploy this somewhere else and it will still work (as long as security groups allows the connection).

    1. Using docker env variables within others such as the SASL properties works perfectly fine. I recommend using my example above and also the org.apache.kafka.common.config.provider.FileConfigProvider to inject your credentials. For the latter you need to mount the env file like described so that you can use it in your connector config.

    2. With the most recent version of debezium you need to use schema.history.internal instead of database. I did not replace the old keys correctly everywhere. What ended up happening was, that I was using schema.history.producer.sasl.jaas.config which misses the internal part. The connect service could then interact with kafka cluster but the producer could not which resulted in the timeout error. Facepalm …

    The corrected connector config looks like this:

        {
          "name": "mysql-connector",
          "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "database.hostname": "dbhost",
            "database.port": "3306",
            "database.user": "debezium",
            "database.password": "debezium",
            "database.server.id": "01",
            "database.include.list": "mydb",
            "snapshot.mode": "schema_only",
            "topic.prefix": "${file:/data/credentials.properties:SERVER_NAME}",
            "table.include.list": "mydb.table1,mydb.table2",
            "schema.history.internal.kafka.bootstrap.servers": "${file:/data/credentials.properties:KAFKA_BOOTSTRAP_SERVER}",
            "schema.history.internal.kafka.topic": "${file:/data/credentials.properties:SERVER_NAME}.schemaChanges",
            "schema.history.internal.consumer.security.protocol": "SASL_SSL",
            "schema.history.internal.consumer.ssl.endpoint.identification.algorithm": "https",
            "schema.history.internal.consumer.sasl.mechanism": "PLAIN",
            "schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:KAFKA_API_KEY}\" password=\"${file:/data/credentials.properties:KAFKA_API_SECRET}\";",
            "schema.history.internal.producer.security.protocol": "SASL_SSL",
            "schema.history.internal.producer.ssl.endpoint.identification.algorithm": "https",
            "schema.history.internal.producer.sasl.mechanism": "PLAIN",
            "schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:KAFKA_API_KEY}\" password=\"${file:/data/credentials.properties:KAFKA_API_SECRET}\";",
            "decimal.handling.mode":"double",
            "transforms": "InsertField",
            "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertField.static.field": "licenseId",
            "transforms.InsertField.static.value": "${file:/data/credentials.properties:LICENSE_ID}"
          }
        }