Search code examples
spring-kafkaapache-kafka-streams

Error when starting Spring Boot application that implements Kafka Streams


I am working on a Spring Boot application and trying to implement Kafka Streams for the first time. This is my kafka configuration class:

`

@Autowired
NumberDetectionService numberDetectionService;

private static final String INPUT_TOPIC = "nn_input";
private static final String OUTPUT_TOPIC = "nn_output";
private static final String STORE_NAME = "ImageDtoStore";


@Bean
public Topology getTopology() {

    final StreamsBuilder builder = new StreamsBuilder();

    StoreBuilder<KeyValueStore<String, ImageDto>> storeBuilder =
            Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore(STORE_NAME),
                    Serdes.String(),
                    ImageDtoSerde.serde()
            );
    builder.addStateStore(storeBuilder);

    builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), ImageDtoSerde.serde())).transform(() -> new ImageDtoTransformer(numberDetectionService), STORE_NAME)
            .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), ImageDtoSerde.serde()));
    System.out.println(builder.build().describe());
    return builder.build();
}
@Bean
public KafkaStreams kafkaStreams() {
    //final StreamsBuilder builder = streamsBuilder();
    KafkaStreams streams = new KafkaStreams(getTopology() , streamsConfiguration());
    streams.cleanUp();
    streams.start();
    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    return streams;
}

private Properties streamsConfiguration() {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "neural-streams");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    return props;
}



@Bean
public NewTopic nnTopicIn() {
    return TopicBuilder.name(INPUT_TOPIC)
            .partitions(1).replicas(1).config(TopicConfig.RETENTION_MS_CONFIG,"3600000").build();
}

@Bean
public NewTopic nnTopicOut() {
    return TopicBuilder.name(OUTPUT_TOPIC).partitions(1).replicas(1).config(TopicConfig.RETENTION_MS_CONFIG,"3600000").build();
}

`

When I start the application, I get the following stack trace:

org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.14.jar:5.3.14] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:765) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:445) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.run(SpringApplication.java:338) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.8.jar:2.5.8] at MultilayerPerceptron.MultiLayerPerceptronApplication.main(MultiLayerPerceptronApplication.java:20) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.5.8.jar:2.5.8] Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:359) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.14.jar:5.3.14] ... 19 common frames omitted Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.apache.kafka.streams.KafkaStreams.getNumStreamThreads(KafkaStreams.java:948) ~[kafka-streams-2.8.0.jar:na] at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:854) ~[kafka-streams-2.8.0.jar:na] at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:711) ~[kafka-streams-2.8.0.jar:na] at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:337) ~[spring-kafka-2.8.0.jar:2.8.0] ... 20 common frames omitted

Is there something more that needs to be configured?


Solution

  • Looks like you do all the Kafka Streams stuff manually, therefore you don't need whatever Spring Boot can auto-configure for you.

    So, consider to remove that @EnableKafkaStreams. Or try to configure your streams the way it is required by Spring for Apache Kafka.

    See more in docs:

    https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.kafka.streams

    https://docs.spring.io/spring-kafka/reference/html/#streams-kafka-streams