I am consuming messages from a topic using schema registry. For the consumer part i am trying to use
io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerwhile consuming the messages in my KafkaListener. But after adding the
KafkaJsonSchemaDeserializer` my spring boot application is not starting and is showing the below exception. Sharing the app logs here:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.1.0)
2023-06-12T23:45:37.069+05:30 INFO 19132 --- [ main] c.example.KafkaPOC.demo.DemoApplication : Starting DemoApplication using Java 17.0.1 with PID 19132 (C:\Users\Debjit\Downloads\demo\target\classes started by Debjit in C:\Users\Debjit\Downloads\demo)
2023-06-12T23:45:37.075+05:30 INFO 19132 --- [ main] c.example.KafkaPOC.demo.DemoApplication : No active profile set, falling back to 1 default profile: "default"
2023-06-12T23:45:39.997+05:30 INFO 19132 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 61002 (http)
2023-06-12T23:45:40.020+05:30 INFO 19132 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2023-06-12T23:45:40.021+05:30 INFO 19132 --- [ main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.8]
2023-06-12T23:45:40.336+05:30 INFO 19132 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2023-06-12T23:45:40.342+05:30 INFO 19132 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 3158 ms
2023-06-12T23:45:42.281+05:30 INFO 19132 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 61002 (http) with context path ''
2023-06-12T23:45:42.382+05:30 INFO 19132 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = earliest
bootstrap.servers = [pkc-ymrq7.us-east-2.aws.confluent.cloud:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-aws-topic-consumer-group-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = aws-topic-consumer-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = PLAIN
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer
2023-06-12T23:45:42.512+05:30 INFO 19132 --- [ main] o.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2023-06-12T23:45:42.512+05:30 INFO 19132 --- [ main] o.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-06-12T23:45:42.518+05:30 INFO 19132 --- [ main] o.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2023-06-12T23:45:42.523+05:30 INFO 19132 --- [ main] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-aws-topic-consumer-group-1 unregistered
2023-06-12T23:45:42.524+05:30 WARN 19132 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
2023-06-12T23:45:42.569+05:30 INFO 19132 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2023-06-12T23:45:42.610+05:30 INFO 19132 --- [ main] .s.b.a.l.ConditionEvaluationReportLogger :
Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2023-06-12T23:45:42.652+05:30 ERROR 19132 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-6.0.9.jar:6.0.9]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-6.0.9.jar:6.0.9]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-6.0.9.jar:6.0.9]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-6.0.9.jar:6.0.9]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:958) ~[spring-context-6.0.9.jar:6.0.9]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:611) ~[spring-context-6.0.9.jar:6.0.9]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.1.0.jar:3.1.0]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:733) ~[spring-boot-3.1.0.jar:3.1.0]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:435) ~[spring-boot-3.1.0.jar:3.1.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) ~[spring-boot-3.1.0.jar:3.1.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1305) ~[spring-boot-3.1.0.jar:3.1.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1294) ~[spring-boot-3.1.0.jar:3.1.0]
at com.example.KafkaPOC.demo.DemoApplication.main(DemoApplication.java:10) ~[classes/:na]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830) ~[kafka-clients-3.4.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) ~[kafka-clients-3.4.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:867) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:382) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:555) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:231) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:555) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:383) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:328) ~[spring-kafka-3.0.7.jar:3.0.7]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-6.0.9.jar:6.0.9]
... 13 common frames omitted
Caused by: java.lang.NoClassDefFoundError: io/confluent/kafka/schemaregistry/rules/RuleException
at java.base/java.lang.Class.getDeclaredConstructors0(Native Method) ~[na:na]
at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3373) ~[na:na]
at java.base/java.lang.Class.getConstructor0(Class.java:3578) ~[na:na]
at java.base/java.lang.Class.getDeclaredConstructor(Class.java:2754) ~[na:na]
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:395) ~[kafka-clients-3.4.0.jar:na]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:401) ~[kafka-clients-3.4.0.jar:na]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436) ~[kafka-clients-3.4.0.jar:na]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421) ~[kafka-clients-3.4.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:716) ~[kafka-clients-3.4.0.jar:na]
... 27 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.rules.RuleException
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[na:na]
... 36 common frames omitted
Disconnected from the target VM, address: '127.0.0.1:49806', transport: 'socket'
Process finished with exit code 1
For the Kafka Topic i have set JSON Schema as the schema.
Sharing my application.yml file :
`server:
port: 61002
spring:
kafka:
bootstrap-servers: pkc-ymrq7.us-east-2.aws.confluent.cloud:9092
properties:
schema:
registry:
url: https://psrc-mw0d1.us-east-2.aws.confluent.cloud
basic:
auth:
credentials:
source: USER_INFO
user:
info: API_KEY:API_SECRET
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule
required username="API_KEY" password="API_SECRET";
mechanism: PLAIN
security:
protocol: SASL_SSL
ssl:
endpoint:
identification:
algorithm: https
consumer:
autoOffsetReset: earliest
group-id: aws-topic-consumer-group
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer
confluent:
topic: schema_topic`
This is my KafkaConsumer :
`package com.example.KafkaPOC.demo.service;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class CoreService {
@Value("${confluent.topic}")
private String inputTopic;
@Autowired
KafkaTemplate<String, String> kafkaTemplate;`your text`
@KafkaListener(topics = "schema_topic", groupId = "aws-topic-consumer-group",containerFactory = "kafkaListenerContainerFactory")
public void consumeFromKafkaViaSchemaValidation(ConsumerRecord<String, JsonNode> record){
log.info("Message consumed from topic "+inputTopic+":- " + record.value());
}
}`
`
This is my project structure for reference.
I am consuming from the topic schema_topic which is having a JSON Schema set on it in confluent cloud. PFA the attachment.
The RuleException class is part of the kafka-schema-registry-client
dependency, which should be included as part of json-schema-serializer
one, but really depends if you are using mismatched versions of both...
That class was only recently introduced to the Schema Registry codebase, so I would first verify if you need to be using the latest version of the json schema dependency or not