I am working on a Spring Boot application and trying to implement Kafka Streams for the first time. This is my kafka configuration class:
`
@Autowired
NumberDetectionService numberDetectionService;
private static final String INPUT_TOPIC = "nn_input";
private static final String OUTPUT_TOPIC = "nn_output";
private static final String STORE_NAME = "ImageDtoStore";
@Bean
public Topology getTopology() {
final StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, ImageDto>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_NAME),
Serdes.String(),
ImageDtoSerde.serde()
);
builder.addStateStore(storeBuilder);
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), ImageDtoSerde.serde())).transform(() -> new ImageDtoTransformer(numberDetectionService), STORE_NAME)
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), ImageDtoSerde.serde()));
System.out.println(builder.build().describe());
return builder.build();
}
@Bean
public KafkaStreams kafkaStreams() {
//final StreamsBuilder builder = streamsBuilder();
KafkaStreams streams = new KafkaStreams(getTopology() , streamsConfiguration());
streams.cleanUp();
streams.start();
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
return streams;
}
private Properties streamsConfiguration() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "neural-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
@Bean
public NewTopic nnTopicIn() {
return TopicBuilder.name(INPUT_TOPIC)
.partitions(1).replicas(1).config(TopicConfig.RETENTION_MS_CONFIG,"3600000").build();
}
@Bean
public NewTopic nnTopicOut() {
return TopicBuilder.name(OUTPUT_TOPIC).partitions(1).replicas(1).config(TopicConfig.RETENTION_MS_CONFIG,"3600000").build();
}
`
When I start the application, I get the following stack trace:
org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.14.jar:5.3.14] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.14.jar:5.3.14] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:765) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:445) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.run(SpringApplication.java:338) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354) ~[spring-boot-2.5.8.jar:2.5.8] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.8.jar:2.5.8] at MultilayerPerceptron.MultiLayerPerceptronApplication.main(MultiLayerPerceptronApplication.java:20) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.5.8.jar:2.5.8] Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:359) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.14.jar:5.3.14] ... 19 common frames omitted Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.apache.kafka.streams.KafkaStreams.getNumStreamThreads(KafkaStreams.java:948) ~[kafka-streams-2.8.0.jar:na] at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:854) ~[kafka-streams-2.8.0.jar:na] at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:711) ~[kafka-streams-2.8.0.jar:na] at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:337) ~[spring-kafka-2.8.0.jar:2.8.0] ... 20 common frames omitted
Is there something more that needs to be configured?
Looks like you do all the Kafka Streams stuff manually, therefore you don't need whatever Spring Boot can auto-configure for you.
So, consider to remove that @EnableKafkaStreams
. Or try to configure your streams the way it is required by Spring for Apache Kafka.
See more in docs:
https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.kafka.streams
https://docs.spring.io/spring-kafka/reference/html/#streams-kafka-streams