I am trying to create a simple Kafka producer. I followed a tutorial since I am new to this topic. I created a config file as recommended in the video. Here is the config file I am using.
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, KafkaProducerModel> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, KafkaProducerModel> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
I am getting the following error when I try to run my application.
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerGroupMetadata
at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
at java.lang.Class.privateGetDeclaredMethods(Class.java:3166) ~[?:?]
at java.lang.Class.getDeclaredMethods(Class.java:2309) ~[?:?]
at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.util.ReflectionUtils.doWithLocalMethods(ReflectionUtils.java:321) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor.buildPersistenceMetadata(PersistenceAnnotationBeanPostProcessor.java:438) ~[spring-orm-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor.findPersistenceMetadata(PersistenceAnnotationBeanPostProcessor.java:409) ~[spring-orm-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor.postProcessMergedBeanDefinition(PersistenceAnnotationBeanPostProcessor.java:336) ~[spring-orm-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyMergedBeanDefinitionPostProcessors(AbstractAutowireCapableBeanFactory.java:1094) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:569) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 15 more
Its a gradle project and here is the dependency
implementation 'org.springframework.kafka:spring-kafka:2.5.6.RELEASE'
I did add the dependency after reviewing a stackover flow thread. However it did not resolve the error. Any lead would be helpful.
implementation 'org.apache.tomcat.embed:tomcat-embed-core'
You appear to have the wrong version of kafka-clients
on the classpath. spring-kafka 2.5.6 uses kafka-clients 2.5.1. You should not declare the version yourself, Spring Boot will pull in the correct version.
Spring Boot 2.3.4 will pull in this version of spring-kafka and its dependencies. If you are using an older boot, you need to override all the dependencies.