Search code examples
javaspring-bootapache-kafkaspring-kafkaconfluent-schema-registry

Getting the below exception io.confluent.kafka.schemaregistry.rules.RuleException when trying to add a KafkaJsonSchemaDeserializer for confluent kafka


I am consuming messages from a topic using schema registry. For the consumer part i am trying to use io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerwhile consuming the messages in my KafkaListener. But after adding theKafkaJsonSchemaDeserializer` my spring boot application is not starting and is showing the below exception. Sharing the app logs here:


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.1.0)

2023-06-12T23:45:37.069+05:30  INFO 19132 --- [           main] c.example.KafkaPOC.demo.DemoApplication  : Starting DemoApplication using Java 17.0.1 with PID 19132 (C:\Users\Debjit\Downloads\demo\target\classes started by Debjit in C:\Users\Debjit\Downloads\demo)
2023-06-12T23:45:37.075+05:30  INFO 19132 --- [           main] c.example.KafkaPOC.demo.DemoApplication  : No active profile set, falling back to 1 default profile: "default"
2023-06-12T23:45:39.997+05:30  INFO 19132 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 61002 (http)
2023-06-12T23:45:40.020+05:30  INFO 19132 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2023-06-12T23:45:40.021+05:30  INFO 19132 --- [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.8]
2023-06-12T23:45:40.336+05:30  INFO 19132 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2023-06-12T23:45:40.342+05:30  INFO 19132 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 3158 ms
2023-06-12T23:45:42.281+05:30  INFO 19132 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 61002 (http) with context path ''
2023-06-12T23:45:42.382+05:30  INFO 19132 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.include.jmx.reporter = true
    auto.offset.reset = earliest
    bootstrap.servers = [pkc-ymrq7.us-east-2.aws.confluent.cloud:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-aws-topic-consumer-group-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = aws-topic-consumer-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = PLAIN
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = SASL_SSL
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 45000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer

2023-06-12T23:45:42.512+05:30  INFO 19132 --- [           main] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2023-06-12T23:45:42.512+05:30  INFO 19132 --- [           main] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-06-12T23:45:42.518+05:30  INFO 19132 --- [           main] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2023-06-12T23:45:42.523+05:30  INFO 19132 --- [           main] o.a.kafka.common.utils.AppInfoParser     : App info kafka.consumer for consumer-aws-topic-consumer-group-1 unregistered
2023-06-12T23:45:42.524+05:30  WARN 19132 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
2023-06-12T23:45:42.569+05:30  INFO 19132 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-06-12T23:45:42.610+05:30  INFO 19132 --- [           main] .s.b.a.l.ConditionEvaluationReportLogger : 

Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2023-06-12T23:45:42.652+05:30 ERROR 19132 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-6.0.9.jar:6.0.9]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-6.0.9.jar:6.0.9]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-6.0.9.jar:6.0.9]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-6.0.9.jar:6.0.9]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:958) ~[spring-context-6.0.9.jar:6.0.9]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:611) ~[spring-context-6.0.9.jar:6.0.9]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.1.0.jar:3.1.0]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:733) ~[spring-boot-3.1.0.jar:3.1.0]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:435) ~[spring-boot-3.1.0.jar:3.1.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) ~[spring-boot-3.1.0.jar:3.1.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1305) ~[spring-boot-3.1.0.jar:3.1.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1294) ~[spring-boot-3.1.0.jar:3.1.0]
    at com.example.KafkaPOC.demo.DemoApplication.main(DemoApplication.java:10) ~[classes/:na]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830) ~[kafka-clients-3.4.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) ~[kafka-clients-3.4.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:867) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:382) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:555) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:231) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:555) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:383) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:328) ~[spring-kafka-3.0.7.jar:3.0.7]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-6.0.9.jar:6.0.9]
    ... 13 common frames omitted
Caused by: java.lang.NoClassDefFoundError: io/confluent/kafka/schemaregistry/rules/RuleException
    at java.base/java.lang.Class.getDeclaredConstructors0(Native Method) ~[na:na]
    at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3373) ~[na:na]
    at java.base/java.lang.Class.getConstructor0(Class.java:3578) ~[na:na]
    at java.base/java.lang.Class.getDeclaredConstructor(Class.java:2754) ~[na:na]
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:395) ~[kafka-clients-3.4.0.jar:na]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:401) ~[kafka-clients-3.4.0.jar:na]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436) ~[kafka-clients-3.4.0.jar:na]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421) ~[kafka-clients-3.4.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:716) ~[kafka-clients-3.4.0.jar:na]
    ... 27 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.rules.RuleException
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[na:na]
    ... 36 common frames omitted

Disconnected from the target VM, address: '127.0.0.1:49806', transport: 'socket'

Process finished with exit code 1


For the Kafka Topic i have set JSON Schema as the schema.

Sharing my application.yml file :

`server:
  port: 61002

spring:
  kafka:
    bootstrap-servers: pkc-ymrq7.us-east-2.aws.confluent.cloud:9092
    properties:
      schema:
        registry:
          url: https://psrc-mw0d1.us-east-2.aws.confluent.cloud
      basic:
        auth:
          credentials:
            source: USER_INFO
          user:
            info: API_KEY:API_SECRET
      sasl:
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule
            required username="API_KEY" password="API_SECRET";
        mechanism: PLAIN
      security:
        protocol: SASL_SSL
      ssl:
        endpoint:
          identification:
            algorithm: https
    consumer:
      autoOffsetReset: earliest
      group-id: aws-topic-consumer-group
      keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
      valueDeserializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer
confluent:
  topic: schema_topic`

This is my KafkaConsumer :

`package com.example.KafkaPOC.demo.service;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class CoreService {

    @Value("${confluent.topic}")
    private String inputTopic;

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;`your text`

    @KafkaListener(topics = "schema_topic", groupId = "aws-topic-consumer-group",containerFactory = "kafkaListenerContainerFactory")
    public void consumeFromKafkaViaSchemaValidation(ConsumerRecord<String, JsonNode> record){
        log.info("Message consumed from topic "+inputTopic+":- " + record.value());

    }
}`
`

This is my project structure for reference.

enter image description here

I am consuming from the topic schema_topic which is having a JSON Schema set on it in confluent cloud. PFA the attachment.

enter image description here


Solution

  • The RuleException class is part of the kafka-schema-registry-client dependency, which should be included as part of json-schema-serializer one, but really depends if you are using mismatched versions of both...

    That class was only recently introduced to the Schema Registry codebase, so I would first verify if you need to be using the latest version of the json schema dependency or not