Search code examples
mysqlsslapache-kafka-connectdebezium

Error connecting to Cloud SQL with SSL using Debezium


Objective: To use debezium to capture changes from Cloud SQL. The instance of Cloud SQL is SSL enabled according to the instructions here

Scenario: I have debezium connect, kafka and zookpeer running as docker containers on my local machine. I have tested the setup against a Cloud SQL instance without SSL. Things are working. Once I enable SSL, convert the pem files (server-ca.pem, client-cert.pem, client-key.pem) to keystore and truststore, mount them as files in the debezium connect docker container, I get an error in Debezium container logs as such (after sending the POST request to the endpoint):

org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: Access denied for user 'user'@'redacted_my_ip' (using password: YES)
    at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:297)
    at io.debezium.connector.mysql.MySqlJdbcContext.readMySqlSystemVariables(MySqlJdbcContext.java:278)
    at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:81)
    at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:53)
    at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:331)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Access denied for user 'user'@'redacted_my_ip' (using password: YES)
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
    at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:835)
    at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:455)
    at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:240)
    at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:207)
    at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:179)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:756)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:751)
    at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:298)
    at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:284)
    ... 14 more

My preliminary analysis: I have been going through the TRACE logs and the source code. According to the logs, it is able to successfully test the connection

2019-04-10 10:11:45,777 INFO   ||  Successfully tested connection for jdbc:mysql://redacted_ip:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=true&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL with user 'user'   [io.debezium.connector.mysql.MySqlConnector]

This log is generated right after line 101

Here we execute a query on the database, which only on successful execution will let the control flow to the log. This according to me implies that debezium connect is able to connect to the database, but fails at some other place. According to the stack trace, debezium fails to connect the second time, here

The payload that I am sending is as follows:

{
    "name": "connector-test",
    "config": {
        "connector.class": "MySql",
        "tasks.max": "1",
        "database.hostname": "redacted_ip",
        "database.port": "3306",
        "database.user": "user",
        "database.password": "redacted_user_password",
        "database.server.name": "dbserver1",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.inventory",
        "database.ssl.mode": "required",
        "database.ssl.keystore": "./keystore",
        "database.ssl.keystore.password": "redacted_keystore_password",
        "database.ssl.truststore": "./truststore",
        "database.ssl.truststore.password": "redacted_truststore_password"
    }
}

What steps are needed in order to get the above setup working


Solution

  • This issue has been fixed and now merged on the master branch with this PR