Search code examples
apache-kafkaapache-kafka-connectavroconfluent-schema-registrystrimzi

Classes for Avro serialization not recognized in converter plugins, using Strimzi to deploy KafkaConnect container


Background: I want to implement Avro serialization for payload values using Strimzi. Despite loading required plug-ins to the Strimzi KafkaConnect YAML, I encounter an error that classes aren't found when I deploy the connector via API. Has anyone managed to figure this out?

Relevant connector configuration specs:

"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<my url>"

KafkaConnect container configuration

build:
    output:
      type: docker
      image: <my image> #
      pushSecret: <my secret> 
    plugins:
      - name: mongodb-connector
        artifacts:
          - type: jar
            url: https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.5.1/mongo-kafka-connect-1.5.1-all.jar # https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.5.0/mongo-kafka-connect-1.5.0-all.jar
          - type: jar
            url: https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
          - type: jar
            url: https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/7.7.1/kafka-connect-avro-converter-7.7.1.jar
          - type: jar
            url: https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/7.7.1/kafka-schema-serializer-7.7.1.jar
          - type: jar
            url: https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.7.1/kafka-schema-registry-client-7.7.1.jar
          

KafkaConnect error logs when I deploy the connector:

Failed to start task <task-name> (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-my-connect-cluster-connect-0.my-connect-cluster-connect.kafka-dev.svc:8083-6]
**java.lang.NoClassDefFoundError: io/confluent/kafka/serializers/AbstractKafkaAvroSerializer**
        at java.base/java.lang.ClassLoader.defineClass1(Native Method)
        at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
        at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:150)
        at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:524)
        at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:427)
        at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:421)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:420)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:116)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
        at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:80)
        at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:395)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:634)
        at org.apache.kafka.connect.runtime.Worker.startSourceTask(Worker.java:560)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:2001)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$38(DistributedHerder.java:2018)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
**Caused by: java.lang.ClassNotFoundException: io.confluent.kafka.serializers.AbstractKafkaAvroSerializer**
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:124)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
        ... 20 more

Solution

  • You need kafka-avro-serializer not kafka-schema-serializer

    You should've also used Confluent hub cli to download Avro converter and Mongo connector, not use Maven central, as your method doesn't guarantee transitive dependencies

    https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer/7.7.1

    Ideally you precreate the container image as well.