Search code examples
javaapache-kafkaspring-kafka

Unsure how to active a Kafka consumer consume() method


Edit: root cause was referring to a property path which did not exist. The Consumer.java class now has this and it is working:

@ConditionalOnProperty(value = "aaronshaver.kafka.consumer-enabled", havingValue = "true")


I have a simple Kafka setup with a producer that is definitely producing. I confirmed this with this code:

SendMessageTask.java

        ListenableFuture<SendResult<String, String>> listenableFuture = this.producer.sendMessage("INPUT_DATA", "IN_KEY", LocalDate.now().toString());

        SendResult<String, String> result = listenableFuture.get();
        logger.info(String.format("\nProduced:\ntopic: %s\noffset: %d\npartition: %d\nvalue size: %d\n", result.getRecordMetadata().topic(), result.getRecordMetadata().offset(), result.getRecordMetadata().partition(), result.getRecordMetadata().serializedValueSize()));

Messages show up in the console with the contents of the send result.

What I can't figure out is how the consumer's Kafka listener method gets called / why it's not getting called. Here's the consumer (I tried a couple different methods):

Consumer.java

@Service
@ConditionalOnProperty(value = "example.kafka.consumer-enabled", havingValue = "true")
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);

    @KafkaListener(topics = "INPUT_DATA", groupId = "fooGroup")
    public void listenGroupFoo(String message) {
        System.out.println("Received Message in group fooGroup: " + message);
    }

    @KafkaListener(topics = {"INPUT_DATA"})
    public void consume(
            final @Payload String message,
            final @Header(KafkaHeaders.OFFSET) Integer offset,
            final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
            final Acknowledgment acknowledgment
            ) {
        logger.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset: %d\nkey: %s\npartition: %d\ntopic: %s", ts, message, offset, key, partition, topic));
        acknowledgment.acknowledge();
    }

I tried this config file:

*KafkaConsumerConfig.java

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "fooGroup");
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

I feel like the @KafkaListener annotated method(s) should be called but I may be misunderstanding how it works, or have something misconfigured.

For good measure, here's the file that has some Spring and Kafka properties set up:

main/resources/application.yml

aaronshaver:
  kafka:
    consumer-enabled: ${consumer-enabled:true}
  spring:
    kafka:
      bootstrap-servers: ${kafka_bootstrap_servers:localhost:29092}
      properties:
        sasl:
          jaas:
            config: org.apache.kafka.common.security.plain.PlainLoginModule required username=${kafka_username:'admin'} password=${kafka_password:'admin-secret'};
          mechanism: PLAIN
        security:
          protocol: SASL_PLAINTEXT
        consumer:
          auto-offset-reset: earliest
          group-id: aaronshaver
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          max-poll-records: 1
          fetch-max-wait: 36000
          enable-auto-commit: false
          client-id: aaronshaver
        producer:
          client-id: aaronshaver
          key-serializer: org.apache.kafka.common.serializatoin.StringSerializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          retries: 2
        jaas:
          enabled: true
        listener:
          poll-timeout: 1800000
          concurrency: 1
          ack-mode: manual_immediate

Any ideas? Thanks!


Solution

  • Your problem is here:

    @ConditionalOnProperty(value = "example.kafka.consumer-enabled", havingValue = "true")
    

    And there is just no such a configuration property.

    When I remove this condition, it started to consume records from the topic.

    Please, revise the configuration or logic in your application.