Initial Question: I have a question how I can bind my GlobalStateStore to a processor. My Application has a GlobalStateStore with an own processor ("GlobalConfigProcessor") to keep the Store up to date. Also, I have another Processor ("MyClassProcessor") which is called in my Consumer Function. Now I try to access the store from MyClassProcessor, but I get an exception saying : Invalid topology: StateStore config_statestore is not added yet.
Update on current situation: I setup a test repository to give a better overview over my situation. This can be found here: https://github.com/fx42/store-example
As you can see in the repo, I have two Consumers which both consume different topics. The Config-Topic provides an event which I want to write to a GlobalStateStore. Here are the StateStoreUpdateConsumer.java and the StateStoreProcessor.java involved. With the MyClassEventConsumer.java I process another Input-Topic and want to read values from the GlobalStateStore. As provided in this doc I can't initialize GlobalStateStores just as StateStoreBean but instead I have to add this actively with the StreamsBuilderFactoryBeanCustomizer Bean. This Code is currently commented out in the StreamConfig.java. Without this code I get the Exception
org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
If the code is in use I get the exception:
org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
So this leads my to the decision, that I have a configuration problem so the topology is messed up.
Questions:
streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));
do I have to provide a Consumer Function for this Processor or do I even have to mention it in the function configuration/application.yml?
Is there a way NOT to provide a ProcessorSupplier into the addGlobalStore
call and just use the functional way for this?
How can I handle this GlobalStateStore if there are two different topologies for both the defined functions?
Here is the commented out StreamBuilderFactoryCustomizer Bean which I use to add the GlobalStateStore to the FactoryBean:
@Bean
StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer(
StoreBuilder<KeyValueStore<String, String>> storeBuilder) {
return factoryBean -> {
try {
var streamBuilder = factoryBean.getObject();
streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));
} catch (Exception e) {
e.printStackTrace();
}
};
} };
}
I figured out my problem. For me it was the @EnableKafkaStreams annotation which I used. I assume this was the reason I had two different contexts running in parallel and they collided. Also I needed to use the StreamsBuilderFactoryBeanConfigurer
instead of StreamsBuilderFactoryBeanCustomizer
to get the GlobalStateStore registered correctly.
Theses changes done in the linked test-repo which now can start the Application Context properly.