Search code examples
springspring-bootapache-kafkaspring-integration

Spring Integration with Kafka auto configuration issue


I am trying to configure a Spring Boot application to consume Kafka messages. After adding:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.3.RELEASE</version>
</dependency>

into my dependencies and with @EnableKafka and @KafkaListener(topics = "some-topic") annotations, I am getting the following error:

...
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'kafkaListenerContainerFactory' available

then I add the following configuration:

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> propsMap = new HashMap<>();

    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return propsMap;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {

    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

The error is gone. However, I think I should be able to autoconfigure this with the spring.kafka.listener.* properties, as the documentation suggests.

If I cannot, I would like to use an autowired KafkaProperties. However, to be able to use it, I am adding:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
    <version>1.5.2.RELEASE</version>
</dependency>

Then it is available to import. When I try to use it as following:

@Autowired
private KafkaProperties kafkaProperties;

and in my method:

 return kafkaProperties.buildConsumerProperties();

I am getting the following error:

Caused by: java.lang.ClassNotFoundException: org.springframework.boot.context.annotation.DeterminableImports
.

which is a Maven dependency problem, I assume.

So my questions are:

  1. Is it possible to configure a Kafka configuration without creating @Beans but only with application.properties?
  2. If not, how can I skip manually creating the required Map object as above but simply use kafkaProperties.buildConsumerProperties() without getting the above error(2nd)?

Solution

  • It turned out to be a Maven issue, as I suspected. Basically, I work on a multi module project with the following structure:

    ────parent
        ├───parent.pom
        ├───module1
        |   └───module1.pom
        └───module2
            └───module2.pom
    

    and my parent.pom had another parent element which was:

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
    </parent>
    

    Basically replacing the above parent of parent with:

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <type>pom</type>
                <version>1.5.2.RELEASE</version>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    

    as suggested here resolved all of the issues (Both autoconfiguration and being able to use KafkaProperties).