Search code examples
javaspring-bootapache-kafkaspring-kafka

Spring kafka - Failed to construct kafka consumer


I'm having an issue with getting my Kafka / confluent spring boot with gradle project up and running. I originally had just a producer in this test project and everything was running well. I then added a Kafka consumer and now I get an exception on start up. Would anyone be able to spot the problem here:

Firstly this is the stacktrace

2021-01-22 19:56:08.566  WARN 61123 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2021-01-22 19:56:08.573  INFO 61123 --- [           main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-01-22 19:56:08.575  INFO 61123 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2021-01-22 19:56:08.576  INFO 61123 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-01-22 19:56:08.584  INFO 61123 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
2021-01-22 19:56:08.586  INFO 61123 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2021-01-22 19:56:08.597  INFO 61123 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-01-22 19:36:06.216 ERROR 61013 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at com.test.DevlyAuthServiceApplication.main(DevlyAuthServiceApplication.java:39)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:358)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:326)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:302)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:269)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:243)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:639)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:305)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 14 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/IsolationLevel
    at io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor.configure(MonitoringConsumerInterceptor.java:46)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:376)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:436)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:417)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:404)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:711)
    ... 28 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.requests.IsolationLevel
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    ... 34 common frames omitted

This is my build.gradle:

plugins {
    id 'org.springframework.boot' version '2.3.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
    id "com.commercehub.gradle.plugin.avro" version "0.21.0"
    id "idea"

}
group 'com.test.tge-auth-service'
version '1.0'

java {
    sourceCompatibility = JavaVersion.VERSION_14
    targetCompatibility = JavaVersion.VERSION_14
}

ext {
    avroVersion = "1.10.1"
}

repositories {
    mavenCentral()
    jcenter()
    maven {
        url "https://packages.confluent.io/maven/"
    }
}

avro {
    createSetters = true
    fieldVisibility = "PRIVATE"
}

dependencies {
//    providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'

    compile group: 'co.elastic.logging', name: 'logback-ecs-encoder', version: '0.5.2'
    compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.860'

    compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '3.0.0'
    compile group: 'io.springfox', name: 'springfox-boot-starter', version: '3.0.0'

    compile('org.springframework.boot:spring-boot-starter-data-elasticsearch')
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.data', name: 'spring-data-elasticsearch', version: '4.0.4.RELEASE'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-jpa', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-security', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.security', name: 'spring-security-oauth2-client', version: '5.4.0'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-validation', version: '2.4.2'
    compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.6.5'

    compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
    compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.11.2'

    compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.21'

    compile group: 'io.jsonwebtoken', name: 'jjwt', version: '0.9.1'
    compile group: 'org.openapitools', name: 'jackson-databind-nullable', version: '0.2.1'

    compile group: 'commons-io', name: 'commons-io', version: '2.6'
    compile group: 'org.apache.commons', name: 'commons-collections4', version: '4.4'
    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
    compile group: 'org.passay', name: 'passay', version: '1.6.0'
    compile group: 'com.google.guava', name: 'guava', version: '30.0-jre'

    compile group: 'io.confluent', name: 'kafka-schema-registry-client', version: '5.4.0'
    compile group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.4.0'
    compile group: 'io.confluent', name: 'monitoring-interceptors', version: '5.4.0'
    compile(group: 'io.confluent', name: 'kafka-streams-avro-serde', version:'5.4.0') {
        exclude(module: 'log4j-over-slf4j')
    }

    compile "org.apache.avro:avro:1.10.1"
    implementation "org.apache.avro:avro:${avroVersion}"

    compileOnly 'org.projectlombok:lombok:1.18.12'
    annotationProcessor 'org.projectlombok:lombok:1.18.12'

    implementation 'com.amazonaws:aws-java-sdk-s3'
    implementation 'org.springframework.boot:spring-boot-starter-web'

    testCompile group: 'junit', name: 'junit', version: '4.12'
    testCompileOnly 'org.projectlombok:lombok:1.18.12'
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.12'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }

    jar {
        manifest {
            attributes(
                    'Main-Class': 'com.test.SpringBootPersistenceApplication'
            )
        }
        from {
            configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
        }
    }
}

Here is my producer which works:

import com.test.messages.avro.model.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@CommonsLog(topic = "Producer Logger")
@RequiredArgsConstructor
public class ProducerK {

  @Value("${topic.name}")
  private String TOPIC;

  private final KafkaTemplate<String, User> kafkaTemplate;

  void sendMessage(User user) {
    this.kafkaTemplate.send(this.TOPIC, "key", user);
    log.info(String.format("Produced user -> %s", user));
  }
}

And finally here is my Consumer:

import com.test.messages.avro.model.User;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
@CommonsLog(topic = "Consumer Logger")
public class Consumer {

    @KafkaListener(
            topics = "#{'${topic.name}'}",
            groupId = "simple-consumer"
    )

    public void consume(User record) throws IOException {
        log.info(String.format("Consumed message -> %s", record));
    }
}

If its of any help here is also my application.yaml file:

topic:
  name: users-2kb
  partitions-num: 3
  replication-factor: 1
spring:
  kafka:
    properties:
      bootstrap.servers: localhost:9092
      schema.registry.url: http://localhost:8081
    consumer:
      group-id: my-microservice
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        specific.avro.reader: true
        interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    template:
      default-topic:
logging:
  level:
    root: info

Thank you for any help you can provide (I'm going out of my mind with this :D)


Solution

  • Boot 2.3 uses spring-kafka 2.5 by default (and kafka-clients 2.5.0); since you have overridden its prescribed spring-kafka version to 2.6.5, you must override all of the kafka dependencies to match

    kafka-clients 2.6.1, kafka-streams 2.6.1 (if you are using them).

    If you are using the embedded Kafka broker in tests, there are a bunch of other jars you need. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#update-deps

    2.6.x is used by Boot 2.4 and will bring in all the right versions.

    Confluent 5.4 uses Kafka 2.4.

    You should use the version of confluent that matches Spring Boot's prescribed versions of spring-kafka, kafka-clients.

    If you use Boot 2.4.x, use confluent 6.0.

    https://docs.confluent.io/platform/current/installation/versions-interoperability.html