Search code examples
redisapache-kafka-streams

How do you write data into a Redis custom state store using Kafka Streams


I've recently been learning about how to use the Kafka Streams client and one thing that I've been struggling with is how to switch from the default state store (RocksDB) to a custom state store using something like Redis. The Confluent documentation makes it clear you have to implement the StateStore interface for your custom store and you must provide an implementation of StoreBuilder for creating instances of that store.

Here's what I have so far for my custom store. I've also added a simple write method to append a new entry into a specified stream via the Redis XADD command.

public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
    private  String name;
    private volatile boolean open = false;
    private boolean loggingEnabled = false;

    public MyCustomStore(String name, boolean loggingEnabled) {
        this.name = name;
        this.loggingEnabled = loggingEnabled;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        if (root != null) {
            // register the store
            context.register(root, (key, value) -> {
                write(key.toString(), value.toString());
            });
        }

        this.open = true;
    }

    @Override
    public void flush() {
        // TODO Auto-generated method stub
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
    }

    @Override
    public boolean persistent() {
        // TODO Auto-generated method stub
        return true;
    }

    @Override
    public boolean isOpen() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public void write(String key, String value) {
        try(Jedis jedis = new Jedis("localhost", 6379)) {
            Map<String, String> hash = new HashMap<>();
            hash.put(key, value);
            jedis.xadd("MyStream", StreamEntryID.NEW_ENTRY, hash);
        }
    }
}

public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<String,String>> {

    private boolean cached = true;
    private String name;

    private Map<String,String> logConfig=new HashMap<>();
    private boolean loggingEnabled;

    public MyCustomStoreBuilder(String name, boolean loggingEnabled){
        this.name = name;
        this.loggingEnabled = loggingEnabled;
    }

    @Override
    public StoreBuilder<MyCustomStore<String,String>> withCachingEnabled() {
        this.cached = true;
        return this;
    }

    @Override
    public StoreBuilder<MyCustomStore<String,String>> withCachingDisabled() {
        this.cached = false;
        return null;
    }

    @Override
    public StoreBuilder<MyCustomStore<String,String>> withLoggingEnabled(Map<String, String> config) {
        loggingEnabled=true;
        return  this;
    }

    @Override
    public StoreBuilder<MyCustomStore<String,String>> withLoggingDisabled() {
        this.loggingEnabled = false;
        return this;
    }

    @Override
    public MyCustomStore<String,String> build() {
        return new MyCustomStore<String,String>(this.name, this.loggingEnabled);
    }

    @Override
    public Map<String, String> logConfig() {
        return logConfig;
    }

    @Override
    public boolean loggingEnabled() {
        return loggingEnabled;
    }

    @Override
    public String name() {
        return name;
    }
}

And here's what my setup and topology look like.

@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
        final Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Long().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Double().getClass());
        props.put(StreamsConfig.STATE_DIR_CONFIG, "data");
        props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, appServerConfig);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
        props.put(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        final String storeName = "the-custome-store";

        Topology topology = new Topology();

        // Create CustomStoreSupplier for store name the-custom-store
        MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder(storeName, false);

        topology.addSource("input","inputTopic");

        topology.addProcessor("redis-processor", () -> new RedisProcessor(storeName), "input");

        topology.addStateStore(customStoreBuilder, "redis-processor");

        KafkaStreams kafkaStreams = new KafkaStreams(topology, props);
        kafkaStreams.start();

        return kafkaStreams;
    }
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<String,String>> {

    @Override
    public boolean accepts(StateStore stateStore) {
        return stateStore instanceof MyCustomStore;
    }

    @Override
    public MyReadableCustomStore<String,String> create(final StateStoreProvider storeProvider, final String storeName) {
        return new MyCustomStoreTypeWrapper<>(storeProvider, storeName, this);
    }
}
public class MyCustomStoreTypeWrapper<K,V> implements MyReadableCustomStore<K,V> {
    private final QueryableStoreType<MyReadableCustomStore<String, String>> customStoreType;
    private final String storeName;
    private final StateStoreProvider provider;

    public MyCustomStoreTypeWrapper(final StateStoreProvider provider,
                                    final String storeName,
                                    final QueryableStoreType<MyReadableCustomStore<String, String>> customStoreType) {

        this.provider = provider;
        this.storeName = storeName;
        this.customStoreType = customStoreType;
    }

    @Override
    public String read(String key) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            StreamEntryID start = new StreamEntryID(0, 0);
            StreamEntryID end = null; // null -> until the last item in the stream
            int count = 2;
            List<StreamEntry> list = jedis.xrange("MyStream", start, end, count);

            if (list != null) {
                // Get the most recently added item, which is also the last item
                StreamEntry streamData = list.get(list.size() - 1);
                return streamData.toString();
            } else {
                System.out.println("No new data in the stream");
            }

            return "";
        }
    }
}
// This throws the InvalidStateStoreException when I try to get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custome-store", new MyCustomStoreType<String,String>());
String value = store.read("testKey");

So, my question is how do I actually get the state store data to persist into Redis now? I feel like I'm missing something in the state store initialization or with the StateRestoreCallback. Any help or clarification with this would be greatly appreciated.


Solution

  • It looks to me that you have the store wired up to the topology correctly. But you don't have any processors using the store.

    It could look something like this:

    final String storeName = "the-custome-store";
    MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder(storeName, false);
    
    Topology topology = new Topology()
    topology.addSource("input", "input-topic");
    
    // makes the processor a child of the source node
    // the source node forwards its records to the child processor node
    topology.addProcessor("redis-processor", () -> new RedisProcessor(storeName), "input");
    
    // add the store and specify the processor(s) that access the store
    topology.addStateStore(storeBuilder, "redis-processor");
    
    
    class RedisProcessor implements Processor<byte[], byte[]> {
            final String storeName;
            MyCustomStore<byte[],byte[]> stateStore;
    
            public RedisProcessor(String storeName) {
                this.storeName = storeName;
            }
    
            @Override
            public void init(ProcessorContext context) {
              stateStore = (MyCustomeStore<byte[], byte[]>) context.getStateStore(storeName);
            }
    
            @Override
            public void process(byte[] key, byte[] value) {
                  stateStore.write(key, value);
            }
    
            @Override
            public void close() {
    
            }
        }
    

    HTH, and let me know how it works out for you.

    Update to answer from comments:

    I think you need to update MyCustomStore.isOpen() to return the open variable. Right now it's hardcoded to return false

    
    Override
      public boolean isOpen() {
       // TODO Auto-generated method stub
       return false;
     }