Search code examples
javaspring-bootspring-cloudspring-cloud-streamspring-cloud-stream-binder-kafka

Binding GlobalStateStore into Processor with spring-cloud-stream-binder-kafka


Initial Question: I have a question how I can bind my GlobalStateStore to a processor. My Application has a GlobalStateStore with an own processor ("GlobalConfigProcessor") to keep the Store up to date. Also, I have another Processor ("MyClassProcessor") which is called in my Consumer Function. Now I try to access the store from MyClassProcessor, but I get an exception saying : Invalid topology: StateStore config_statestore is not added yet.

Update on current situation: I setup a test repository to give a better overview over my situation. This can be found here: https://github.com/fx42/store-example

As you can see in the repo, I have two Consumers which both consume different topics. The Config-Topic provides an event which I want to write to a GlobalStateStore. Here are the StateStoreUpdateConsumer.java and the StateStoreProcessor.java involved. With the MyClassEventConsumer.java I process another Input-Topic and want to read values from the GlobalStateStore. As provided in this doc I can't initialize GlobalStateStores just as StateStoreBean but instead I have to add this actively with the StreamsBuilderFactoryBeanCustomizer Bean. This Code is currently commented out in the StreamConfig.java. Without this code I get the Exception

org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.

If the code is in use I get the exception:

org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory

So this leads my to the decision, that I have a configuration problem so the topology is messed up.

Questions:

  1. When I provide the Processor for the GlobalStateStore directly via
streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
                            Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));

do I have to provide a Consumer Function for this Processor or do I even have to mention it in the function configuration/application.yml?

  1. Is there a way NOT to provide a ProcessorSupplier into the addGlobalStore call and just use the functional way for this?

  2. How can I handle this GlobalStateStore if there are two different topologies for both the defined functions?

Here is the commented out StreamBuilderFactoryCustomizer Bean which I use to add the GlobalStateStore to the FactoryBean:

@Bean
StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer(
            StoreBuilder<KeyValueStore<String, String>> storeBuilder) {

        return factoryBean -> {
            try {
                var streamBuilder = factoryBean.getObject();
                streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
                        Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));

            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    } };
}

Solution

  • I figured out my problem. For me it was the @EnableKafkaStreams annotation which I used. I assume this was the reason I had two different contexts running in parallel and they collided. Also I needed to use the StreamsBuilderFactoryBeanConfigurer instead of StreamsBuilderFactoryBeanCustomizer to get the GlobalStateStore registered correctly. Theses changes done in the linked test-repo which now can start the Application Context properly.