Search code examples
javaspring-bootapache-kafkaspring-kafka

Consumer not receiving Kafka SpringBoot messages


I have a problem. I have several microservices, sender and consumer and kafka. The sender receives an HTTP request and redirects to kafka. Everything works as it should, in the Kafka console I receive messages, but the SpringBoot client does not receive it. help me please

My KafkaConsumerConfig:

@Configuration
@EnableKafka
public class KafkaConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kafka cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testTopicGroup");
        // automatically reset the offset to the earliest offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    @Primary
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean(name = "kafkaListenerContainerFactoryWith6Consumer")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryWith6Consumer() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(6); //3 partition -> 6 thread in parallel in a single consumer
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean(name = "kafkaListenerContainerFactoryForBatchConsumer")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryForBatchConsumer() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setBatchListener(true);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Listener:

@Slf4j
public class KafkaListener {
    @org.springframework.kafka.annotation.KafkaListener(topics = "${kafka.topic.messageTopic}")
    public void receive(String payload){
        log.info("Message is received from Kafka: " + payload);
    }

ConsumerApplicationProperties:

kafka.bootstrap-servers=localhost:9092
server.port=8081
kafka.topic.messageTopic=test

Solution

  • My solution: In the listener, before the class name, I added @Component