Search code examples
spring-cloud-stream-binder-kafka

How to bind a Store using Spring Cloud Stream and Kafka?


I'd like to use a Kafka state store of type KeyValueStore in a sample application using the Kafka Binder of Spring Cloud Stream. According to the documentation, it should be pretty simple. This is my main class:

@SpringBootApplication
public class KafkaStreamTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamTestApplication.class, args);
    }

    @Bean
    public BiFunction<KStream<String, String>, KeyValueStore<String,String>, KStream<String, String>> process(){
        return (input,store) -> input.mapValues(v -> v.toUpperCase());

    }


    @Bean
    public StoreBuilder myStore() {
        return Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("my-store"), Serdes.String(),
                Serdes.String());
    }
}

I suppose that the KeyValueStore should be passed as the second parameter of the "process" method, but the application fails to start with the message below:

Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.state.KeyValueStore among registered factories: channelFactory,messageSourceFactory,kStreamBoundElementFactory,kTableBoundElementFactory,globalKTableBoundElementFactory
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.getBindingTargetFactory(AbstractBindableProxyFactory.java:82) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.bindInput(KafkaStreamsBindableProxyFactory.java:191) ~[spring-cloud-stream-binder-kafka-streams-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.afterPropertiesSet(KafkaStreamsBindableProxyFactory.java:103) ~[spring-cloud-stream-binder-kafka-streams-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1855) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1792) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]

Solution

  • I found the solution about how to use a store reading an unit test in Spring Cloud Stream.

    The code below is how I applied that solution to my code. The transformer uses the Store provided by Spring bean method "myStore"

    @SpringBootApplication
    public class KafkaStreamTestApplication {
    
        public static final String MY_STORE_NAME = "my-store";
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaStreamTestApplication.class, args);
        }
    
    
        @Bean
        public Function<KStream<String, String>, KStream<String, String>> process2(){
            return (input) -> input.
                  transformValues(() -> new MyValueTransformer(), MY_STORE_NAME);
    
        }
    
    
        @Bean
        public StoreBuilder<?> myStore() {
            return Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore(MY_STORE_NAME), Serdes.String(),
                    Serdes.String());
        }
    
    }
    
    public class MyValueTransformer implements ValueTransformer<String, String> {
    
        private KeyValueStore<String,String> store;
        private ProcessorContext context;
    
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            store = (KeyValueStore<String, String>) this.context.getStateStore(KafkaStreamTestApplication.MY_STORE_NAME);
    
        }
    
        @Override
        public String transform(String value) {
            String tValue = store.get(value);
    
            if(tValue==null) {
                store.put(value, value.toUpperCase());
            }
    
            return tValue;
        }
    
        @Override
        public void close() {
            if(store!=null) {
                store.close();
            }
        }
    
    }