Search code examples
jdbcapache-kafkaapache-kafka-connectksqldb

Why is my JDBC Kafka Connector not picking up new events or creating a topic after creating a source connector?


We currently have a connector up and running according to our ksqlDB Server:

 ENTITY_CHANGE  | SOURCE | io.confluent.connect.jdbc.JdbcSourceConnector | RUNNING (1/1 tasks RUNNING)

Though a jdbc_entity_change topic is not generated by our source connector like it is when we create this connector locally. The Kafka Connect server is embedded onto our ksqlDB server.

CREATE SOURCE CONNECTOR entity_change WITH (
    'connector.class'          = 'io.confluent.connect.jdbc.JdbcSourceConnector',
    'connection.url'           = redacted,
    'connection.user'          = redacted,
    'connection.password'      = redacted,
    'topic.prefix'             = 'jdbc_',
    'mode'                     = 'timestamp+incrementing',
    'numeric.mapping'          = 'best_fit',
    'incrementing.column.name' = 'id',
    'timestamp.column.name'    = 'last_modified',
    'key'                      = 'id',
    'key.converter'            = 'org.apache.kafka.connect.converters.LongConverter',
    'query'                    = 'redacted'
);

We've confirmed that we have connectivity to our database since our database throws an error when we supply incorrect credentials.

Logs on ksqldb server:

2023-03-13 11:09:03 ksqldb-server              | [2023-03-13 18:09:03,164] INFO Instantiated connector ENTITY_CHANGE with version 10.0.0 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:322)
2023-03-13 11:09:03 ksqldb-server              | [2023-03-13 18:09:03,166] INFO Finished creating connector ENTITY_CHANGE (org.apache.kafka.connect.runtime.Worker:347)
2023-03-13 11:09:03 ksqldb-server              | [2023-03-13 18:09:03,172] INFO [Worker clientId=connect-1, groupId=ksql-connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1687)
2023-03-13 11:09:03 ksqldb-server              | [2023-03-13 18:09:03,174] INFO Starting JDBC Source Connector (io.confluent.connect.jdbc.JdbcSourceConnector:69)
2023-03-13 11:09:03 ksqldb-server              | [2023-03-13 18:09:03,182] INFO JdbcSourceConnectorConfig values:
2023-03-13 11:09:03 ksqldb-server              |        batch.max.rows = 100
2023-03-13 11:09:03 ksqldb-server              |        catalog.pattern = null
2023-03-13 11:09:03 ksqldb-server              |        connection.attempts = 3
2023-03-13 11:09:03 ksqldb-server              |        connection.backoff.ms = 10000
2023-03-13 11:09:03 ksqldb-server              |        connection.password = [hidden]
2023-03-13 11:09:03 ksqldb-server              |        connection.url = jdbc:redacted?zeroDateTimeBehavior=round
2023-03-13 11:09:03 ksqldb-server              |        connection.user = redacted
2023-03-13 11:09:03 ksqldb-server              |        db.timezone = UTC
2023-03-13 11:09:03 ksqldb-server              |        dialect.name =
2023-03-13 11:09:03 ksqldb-server              |        incrementing.column.name = id
2023-03-13 11:09:03 ksqldb-server              |        mode = timestamp+incrementing
2023-03-13 11:09:03 ksqldb-server              |        numeric.mapping = best_fit
2023-03-13 11:09:03 ksqldb-server              |        numeric.precision.mapping = false
2023-03-13 11:09:03 ksqldb-server              |        poll.interval.ms = 5000
2023-03-13 11:09:03 ksqldb-server              |        query = redacted
2023-03-13 11:09:03 ksqldb-server              |        query.suffix =
2023-03-13 11:09:03 ksqldb-server              |        quote.sql.identifiers = ALWAYS
2023-03-13 11:09:03 ksqldb-server              |        schema.pattern = null
2023-03-13 11:09:03 ksqldb-server              |        table.blacklist = []
2023-03-13 11:09:03 ksqldb-server              |        table.poll.interval.ms = 60000
2023-03-13 11:09:03 ksqldb-server              |        table.types = [TABLE]
2023-03-13 11:09:03 ksqldb-server              |        table.whitelist = []
2023-03-13 11:09:03 ksqldb-server              |        timestamp.column.name = [last_modified]
2023-03-13 11:09:03 ksqldb-server              |        timestamp.delay.interval.ms = 0
2023-03-13 11:09:03 ksqldb-server              |        timestamp.initial = null
2023-03-13 11:09:03 ksqldb-server              |        topic.prefix = jdbc_entity_change
2023-03-13 11:09:03 ksqldb-server              |        validate.non.null = true

Whenever we modify any fields that should be picked up my SQL query in the query field of this source connector nothing is picked up nor is a topic created for this source connector to start emitting events to.

Since creating the source connector does not generate a topic as it does locally, I have even attempted to manually create a topic and directly link the source connector to the topic via the topic.prefix field. This also does not work.

Does anybody have any ideas about what could be potentially misconfigured?


Solution

  • The issue was that I needed to supply authentication details at the for Kafka Connect at the producer and consumer level on top of the base authentication level. This is pretty silly that this was the root cause and this is what you need for your brokers to be able to communicate with your kafka connect embedded server. This doesn't seem to be documented anywhere in ksqlDB / Kafka documentation.

    You need to provide the following properties for this to work if your Kafka broker has authentication enabled.

    KSQL_CONNECT_SECURITY_PROTOCOL=SASL_PLAINTEXT
    KSQL_CONNECT_SASL_MECHANISM=SCRAM-SHA-256
    KSQL_CONNECT_SASL_JAAS_CONFIG=
    KSQL_CONNECT_CONSUMER_SASL_MECHANISM=SCRAM-SHA-256
    KSQL_CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_PLAINTEXT
    KSQL_CONNECT_CONSUMER_SASL_JAAS_CONFIG= 
    KSQL_CONNECT_PRODUCER_SASL_MECHANISM=SCRAM-SHA-256
    KSQL_CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_PLAINTEXT
    KSQL_CONNECT_PRODUCER_SASL_JAAS_CONFIG=
    

    These are Docker-specific configurations FYI. If you were using a configuration file it would be in the following syntax

    consumer.sasl.jaas.config=