Search code examples
springspring-bootapache-kafkaapache-kafka-streamsspring-cloud-stream

Spring Cloud Stream Kafka Streams Binder KafkaException: Could not start stream: 'listener' cannot be null


I am new to Kafka Streams and Spring Cloud Stream but have read good things about it in terms of moving the integration related codes into properties file so devs can focus mostly on the business logic side of things.

Here I have my simple application class.

package com.some.events.consumer

import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer

@SpringBootApplication
class ConsumerApplication {
    @Bean
    fun consume(): Consumer<KStream<String, SomeEvent>> {
        return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
    }
}

fun main(args: Array<String>) {
    runApplication<ConsumerApplication>(*args)
}

My application.yml file is as follows.

spring:
  cloud:
    function:
      definition: consume
    stream:
      bindings:
        consume-in-0:
          destination: "some-event"
          group: "some-event"

My dependencies in build.gradle.kts are defined as follows (just included the relevant ones here).

extra["springCloudVersion"] = "2020.0.2"

dependencies {
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

When I run the application I got the following exception.

org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
    at com.some.events.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:22) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:94) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
    ... 14 common frames omitted
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    ... 15 common frames omitted


Process finished with exit code 1

Note that I am aware that I need to configure the Serde and Avro related things (I am using Avro for event schema), but the thing is, the stream won't even run.

Can someone point me in the right direction? I tried googling this but no one has posted an issue where it's caused by 'listener' cannot be null. Thanks!


Solution

  • This is a bug; it is fixed in the 3.1.3-SNAPSHOT

    https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/f25dbff2b7fc0d0c742dd674a9e392057a34c86d

    https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087

    I am not sure about the comment there; adding micrometer to the class path should resolve it.