I am developing Spring Boot + Apache Kafka + Apache Zookeeper example. I've installed/setup Apache Zookeeper and Apache Kafka on my local Windows machine. I've taken a reference from link: https://www.tutorialspoint.com/spring_boot/spring_boot_apache_kafka.htm and executed code as is:
Setup: https://medium.com/@shaaslam/installing-apache-kafka-on-windows-495f6f2fd3c8
Error:
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
at com.tutorialspoint.SpringKafkaTutorialspointApplication.main(SpringKafkaTutorialspointApplication.java:21) [classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Logs from Zookeeper server:
2018-12-14 22:16:35,352 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60197 (no session established for client)
2018-12-14 22:16:36,260 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60200
2018-12-14 22:16:36,260 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:36,262 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60200 (no session established for client)
2018-12-14 22:16:37,473 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60204
2018-12-14 22:16:37,473 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:37,476 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60204 (no session established for client)
2018-12-14 22:16:38,383 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60207
2018-12-14 22:16:38,384 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:38,388 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60207 (no session established for client)
2018-12-14 22:16:39,494 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60211
2018-12-14 22:16:39,494 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:39,497 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60211 (no session established for client)
2018-12-14 22:16:40,506 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60214
2018-12-14 22:16:40,507 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:40,509 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60214 (no session established for client)
2018-12-14 22:16:41,519 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60217
2018-12-14 22:16:41,519 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:41,525 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60217 (no session established for client)
KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
SpringKafkaTutorialspointApplication.java
@SpringBootApplication
public class SpringKafkaTutorialspointApplication implements CommandLineRunner{
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("tutorialspoint", message);
}
public static void main(String[] args) {
SpringApplication.run(SpringKafkaTutorialspointApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
sendMessage("Hi Welcome to Spring For Apache Kafka @@@@@@@@@");
}
@KafkaListener(topics = "tutorialspoint", groupId = "group-id")
public void listen(String message) {
System.out.println("Received Messasge in group - group-id: " + message);
}
}
What can I try next to resolve this?
After spending many hours I found that I should be using below in KafkaConsumerConfig
too.
With the below changes code works fine.
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");