Search code examples
apache-kafkaapache-kafka-streamsrocksdb

How to get a sorted KeyValueStore from a KTable?


I want to materialize a KTable from KStream and I want the KeyValueStore to be sorted by the Key.

I tried looking up the KTable API Spec (https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KTable.html), but no 'sort'-method exists. I also looked up this article (https://dzone.com/articles/how-to-order-streamed-dataframes) that suggests implementing sorting via the Processor API. However, I am checking to see if this can be achieved some other way ?


Solution

  • KafkaStream allows you materialized queriable state stores. You can then get a read only access to a store by invoking the method kafkaStream#store().

    If you define persistant store, KafkaStreams will use RocksDB to store your data. The returned KeyValueIterator instance will used a RocksDB iterator that will allow you to iterate over the keys-values in a sorted manner Rocks Iterator-Implementation.

    Example :

        KafkaStreams streams = new KafkaStreams(topology, props);
        ReadOnlyKeyValueStore<Object, Object> store = streams.store("storeName", QueryableStoreTypes.keyValueStore());
        KeyValueIterator<Object, Object> iterator = store.all();