Search code examples
sql-serverapache-kafkaapache-kafka-connectksqldbdebezium

How to use the Debezium SQL Server connector with ksqlDB embedded Connect?


After a day spent experimenting with various configuration options, I've not been able to get the Debezium SQL Server connector to work with ksqlDB embedded Kafka Connect. There appears to be no clear guidance on either the ksqlDB or Debezium websites on how to set this up.

What documentation there is suggests that one just needs to install the relevant Kafka Connect plugin into a specified location and everything should work magically out of the box... but unfortunately I've had no such luck and the error messages I'm receiving aren't giving me any feedback to work with.

My docker-compose.yml looks as follows and I have DEBEZIUM_VERSION=1.3 and KSQLDB_VERSION=0.11.0 in my .env file:

version: '3.7'
services:
  # ***********************
  # * Kafka               *
  # ***********************
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    networks:
      - infrastructure
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888

  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    depends_on:
      - zookeeper
    networks:
      - infrastructure
    ports:
     - 9092:9092
     - 19092:19092
    environment:
     - BROKER_ID=1
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - ALLOW_PLAINTEXT_LISTENER=yes

  schema-registry:
    image: confluentinc/cp-schema-registry
    depends_on:
      - zookeeper
      - kafka
    networks:
      - infrastructure
    ports:
     - 8181:8181
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
     - SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS
     - SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN=*

  schema-registry-ui:
    image: landoop/schema-registry-ui
    depends_on:
      - schema-registry
    networks:
      - infrastructure
    ports:
     - 8000:8000
    environment:
     - SCHEMAREGISTRY_URL=http://schema-registry:8081
     - PROXY=true

  ksqldb-server:
    image: confluentinc/ksqldb-server:${KSQLDB_VERSION}
    build:
      context: ksqldb-sqlserver
      args:
        KSQLDB_VERSION: ${KSQLDB_VERSION}
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka
      - schema-registry
    networks:
      - infrastructure
    ports:
      - "8088:8088"
    #volumes:
    #  - "./confluent-hub-components/:/usr/share/kafka/plugins/"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"

  # *-----------------------------*
    # To connect to the DB: 
    #   docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
    # *-----------------------------*  
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:${KSQLDB_VERSION}
    container_name: ksqldb-cli
    depends_on:
      - kafka
      - ksqldb-server
    networks:
      - infrastructure
    entrypoint: /bin/sh
    tty: true

networks:
  infrastructure:
    name: infrastructure

My Dockerfile for the ksqldb-sqlserver service above, which basically just takes the default image and then copies the contents of the Debezium SQL Server connector plugin to /usr/share/confluent-hub-components, looks as follows:

ARG KSQLDB_VERSION

FROM confluentinc/ksqldb-server:${KSQLDB_VERSION}

ARG DEBEZIUM_VERSION=1.3.0.Beta2

ENV PLUGINS_DIR=/usr/share/confluent-hub-components/
ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
USER root
RUN mkdir -p $PLUGINS_DIR && chown -R appuser: $PLUGINS_DIR
USER appuser
RUN cd $PLUGINS_DIR \
    && curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/$DEBEZIUM_VERSION/debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz \
    && tar -xzf debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz \
    && rm debezium-connector-sqlserver-$DEBEZIUM_VERSION-plugin.tar.gz

And finally the statement I use to create the connector in ksqldb-cli is as follows:

CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = ''******',
    'database.dbname' = ''******',
    'database.server.name' = ''******',
    'table.whitelist' = ''******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = ''******',
    'tasks.max' = '1'
);

The ksqlDB CLI returns 'HTTP ERROR 500 Request failed' and, on inspecting the Docker logs for ksqldb-server, I see the following cryptic message (which I've had to truncate slightly because of StackOverflow character limits):

[2020-09-17 21:34:15,983] INFO Received: KsqlRequest{ksql='CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = '******',
    'database.dbname' = '******',
    'database.server.name' = '******',
    'table.whitelist' = '******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = '******',
    'tasks.max' = '1'
);', configOverrides={}, requestProperties={}, commandSequenceNumber=Optional[-1]} (io.confluent.ksql.rest.server.resources.KsqlResource:223)
[2020-09-17 21:34:16,156] WARN /connectors (org.eclipse.jetty.server.HttpChannel:600)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/debezium/DebeziumException
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/debezium/DebeziumException
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 30 more
...
...
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,180] WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:768)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,191] WARN /connectors (org.eclipse.jetty.server.HttpChannel:600)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
...
...
[2020-09-17 21:34:16,192] WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:768)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:551)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1369)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:489)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1284)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:501)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:272)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
    at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
    at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
    ... 34 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.sqlserver.SqlServerConnectorConfig
    at io.debezium.connector.sqlserver.SqlServerConnector.config(SqlServerConnector.java:57)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
[2020-09-17 21:34:16,194] WARN Failed to query connect cluster after 3 attempts. (io.confluent.ksql.services.DefaultConnectClient:278)
[2020-09-17 21:34:16,195] WARN Did not CREATE connector ACCOUNTS_READER: <html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
<title>Error 500 Request failed.</title>
</head>
<body><h2>HTTP ERROR 500 Request failed.</h2>
<table>
<tr><th>URI:</th><td>/connectors</td></tr>
<tr><th>STATUS:</th><td>500</td></tr>
<tr><th>MESSAGE:</th><td>Request failed.</td></tr>
<tr><th>SERVLET:</th><td>org.glassfish.jersey.servlet.ServletContainer-7c956dda</td></tr>
</table>
<hr><a href="http://eclipse.org/jetty">Powered by Jetty:// 9.4.30.v20200611</a><hr/>

</body>
</html>
 (io.confluent.ksql.services.DefaultConnectClient:115)
[2020-09-17 21:34:16,196] INFO Processed successfully: KsqlRequest{ksql='CREATE SOURCE CONNECTOR accounts_reader WITH (
    'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
    'database.hostname' = 'host.docker.internal',
    'database.port' = '1433',
    'database.user' = '******',
    'database.password' = '******',
    'database.dbname' = '******',
    'database.server.name' = '******',
    'table.whitelist' = '******',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic' = '******',
    'tasks.max' = '1'
);', configOverrides={}, requestProperties={}, commandSequenceNumber=Optional[-1]} (io.confluent.ksql.rest.server.resources.KsqlResource:261)

I'm not 100% sure, but I don't think this means that the SqlServerConnectorConfig class is missing, but rather that there was a fundamental error on initialization (probably something more fundamental than some errors in the connector configuration).

Is anyone able to point out where I have gone wrong or, alternatively, an easy way to set up Debezium source connectors and JDBC sink connectors in ksqlDB embedded Connect?


Solution

  • Here's a working Docker Compose for you. From it can you distil out how to build your own Docker image if you want to

    ---
    version: '2'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:5.5.1
        container_name: zookeeper
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      broker:
        image: confluentinc/cp-kafka:5.5.1
        container_name: broker
        depends_on:
          - zookeeper
        ports:
        # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
        # An important note about accessing Kafka from clients on other machines:
        # -----------------------------------------------------------------------
        #
        # The config used here exposes port 9092 for _external_ connections to the broker
        # i.e. those from _outside_ the docker network. This could be from the host machine
        # running docker, or maybe further afield if you've got a more complicated setup.
        # If the latter is true, you will need to change the value 'localhost' in
        # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
        # remote clients
        #
        # For connections _internal_ to the docker network, such as from other services
        # and components, use kafka:29092.
        #
        # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
        # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
        #
          - 9092:9092
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
          KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
    
      schema-registry:
        image: confluentinc/cp-schema-registry:5.5.1
        container_name: schema-registry
        ports:
          - "8081:8081"
        depends_on:
          - zookeeper
          - broker
        environment:
          SCHEMA_REGISTRY_HOST_NAME: schema-registry
          SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
    
      ksqldb:
        # *-----------------------------*
        # To connect to ksqlDB CLI
        #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
        # *-----------------------------*
        image: confluentinc/ksqldb-server:0.11.0
        container_name: ksqldb
        depends_on:
          - broker
        ports:
          - "8088:8088"
          - "8083:8083"
        user: root
        environment:
          KSQL_LISTENERS: http://0.0.0.0:8088
          KSQL_BOOTSTRAP_SERVERS: broker:29092
          KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
          KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
          KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
          # Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
          KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
          # Kafka Connect config below
          KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
          KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
          KSQL_CONNECT_REST_PORT: 8083
          KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
          KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
          KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
          KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
          KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
          KSQL_CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
          KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
          KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
          KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
          KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
          KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
          KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'
    
        command:
          # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
          - bash
          - -c
          - |
            echo "Installing connector plugins"
            # ------ hack to workaround absence of confluent-hub client
            # mkdir -p /usr/share/confluent-hub-components/
            # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ debezium/debezium-connector-sqlserver:1.2.2
            curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
            yum install -y unzip
            unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
            # ----------------------------------------------------------
            #
            echo "Launching ksqlDB"
            /usr/bin/docker/run &
    
            sleep infinity
    
    

    You can find a full copy of the Docker Compose including a sample MS SQL container configured for CDC here

    For a full walk through and working example see this blog: https://rmoff.net/2020/09/18/using-the-debezium-ms-sql-connector-with-ksqldb-embedded-kafka-connect/