Search code examples

StateStore is never added on Spring cloud

Any Help how can I add state store on Spring cloud

I always receive this error "nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore myStore is not added yet."

Here is the bean definition however it never works

  public StoreBuilder storeBuilder() {
    KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");
    StoreBuilder<KeyValueStore<String, MyData>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), StreamsSerde.MyDataSerde());
    return storeBuilder;

Here is the Serde

public static final class MyDataSerde extends Serdes.WrapperSerde<MyData> {
    public MyDataSerde() {
      super(new JsonSerializer<>(), new JsonDeserializer<>(MyData.class));

Here is the data class

public class MyData {
  private String name;
  private String course;

Here is the spring cloud dependencies

springBootVersion = "2.2.5.RELEASE"
set('springCloudVersion', "Hoxton.SR3")

implementation group:"", name: "spring-cloud-stream"
    implementation group: "", name: "spring-cloud-stream-binder-kafka-streams"
    implementation group: "", name: "spring-cloud-starter-stream-kafka" 


  • I found a solution to add the store programmatically on this article

    public void initializeStateStores() throws Exception {
       StreamsBuilderFactoryBean streamsBuilderFactoryBean =
             applicationContext.getBean("&stream-builder-requestListener", StreamsBuilderFactoryBean.class);
       StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
       StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), Serdes.String(), Serdes.Long());