I am trying to get a distributed setup for the ignite-connector to run. Sadly, it does not work. I was able to grab the log on creation of the connector via the api.
/connectors
{
"name": "ignite-connector",
"config": {
"connector.class": "org.apache.ignite.stream.kafka.connect.IgniteSinkConnector",
"tasks.max": "2",
"topics": "someTopic1",
"cacheName": "myCache",
"cacheAllowOverwrite": true,
"igniteCfg":"/opt/ignite/examples/config/example-cache.xml"}
}
}
I set up the ignite-connector as a plugin. I built an uber-jar from the repo and put it to a separate direcotry and included it as plugin in the .properties
file I am using to start connect-distributed.sh
.
I set the classpath for the jobs for both the connetor and kafka I am managing with systemd:
Environment=CLASSPATH=/opt/kafka/ignite-connector/*
Following the full error log:
[2022-11-17 19:49:30,268] INFO [ignite-connector|worker] SinkConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = ignite-connector
predicates = []
tasks.max = 2
topics = [someTopic1]
topics.regex =
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SinkConnectorConfig:376)
[2022-11-17 19:49:30,272] INFO [ignite-connector|worker] EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = ignite-connector
predicates = []
tasks.max = 2
topics = [someTopic1]
topics.regex =
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
[2022-11-17 19:49:30,276] INFO [ignite-connector|worker] Instantiated connector ignite-connector with version 3.3.1 of type class org.apache.ignite.stream.kafka.connect.IgniteSinkConnector (org.apache.kafka.connect.runtime.Worker:322)
[2022-11-17 19:49:30,276] INFO [ignite-connector|worker] Finished creating connector ignite-connector (org.apache.kafka.connect.runtime.Worker:347)
[2022-11-17 19:49:30,277] ERROR [ignite-connector|worker] WorkerConnector{id=ignite-connector} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:201)
java.lang.NoClassDefFoundError: org/apache/ignite/internal/util/typedef/internal/A
at org.apache.ignite.stream.kafka.connect.IgniteSinkConnector.start(IgniteSinkConnector.java:55)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:193)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:218)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:363)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:346)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:146)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-11-17 19:49:30,277] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1687)
[2022-11-17 19:49:30,280] ERROR [ignite-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'ignite-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1811)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: ignite-connector
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$35(DistributedHerder.java:1782)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:146)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector ignite-connector to state STARTED
... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/ignite/internal/util/typedef/internal/A
at org.apache.ignite.stream.kafka.connect.IgniteSinkConnector.start(IgniteSinkConnector.java:55)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:193)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:218)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:363)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:346)
... 7 more
The mentioned class (A
) is included in the ignite-core-2.9.1.jar that is bundeld in the uberJar in the Plugin directory.
Any pointers are appreciated
There seems to be a misunderstanding what "plugins" are. Those only are classes defined as implementations of Converters, Transforms, and Connectors.
Internal Ignite classes are none of these, so they wouldn't be loaded into the plugin.path
classloader.
To fix this, you'll need to ensure you export CLASSPATH=/path/to/ignite-files/*.jar
and you can use jar -tf
commands to validate the class exists in any specific JAR before running Connect process.
It is not a hack; that's just how Java Classloader works.