I have a problem. I have several microservices, sender and consumer and kafka. The sender receives an HTTP request and redirects to kafka. Everything works as it should, in the Kafka console I receive messages, but the SpringBoot client does not receive it. help me please
My KafkaConsumerConfig:
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testTopicGroup");
// automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
@Primary
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean(name = "kafkaListenerContainerFactoryWith6Consumer")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryWith6Consumer() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(6); //3 partition -> 6 thread in parallel in a single consumer
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean(name = "kafkaListenerContainerFactoryForBatchConsumer")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryForBatchConsumer() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Listener:
@Slf4j
public class KafkaListener {
@org.springframework.kafka.annotation.KafkaListener(topics = "${kafka.topic.messageTopic}")
public void receive(String payload){
log.info("Message is received from Kafka: " + payload);
}
ConsumerApplicationProperties:
kafka.bootstrap-servers=localhost:9092
server.port=8081
kafka.topic.messageTopic=test
My solution: In the listener, before the class name, I added @Component