Search code examples
apache-kafka-streamsrocksdbstream-processing

Kafka Streams building StateStoreSupplier: API clarifications


I am using Kafka Streams of version 0.11.0.2.

In order to leverate transform API I create my own StateStoreSupplier using Stores.create builder method. The problem is that javadoc for some field/methods to me is not clear enough.

val storeSupplier = Stores.create(STORE_NAME)
            .withStringKeys()
            .withStringValues()
            .persistent()
            .disableLogging()
            .windowed(WINDOW_SIZE, RETENTION, 3, false)
            .enableCaching()
            .build()

How that mentioned changelog would be represented?

/**
* Indicates that a changelog should not be created for the key-value store
*/
PersistentKeyValueFactory<K, V> disableLogging();

How these 4 values affect each other? Each window has max number of elements - windowSize? Once it is reached new window started? And each window could be divided up to numSegments files at disk for RocksDB? Duplicate means same both key and value and it is detected only within the same window?

 /**
 * Set the persistent store as a windowed key-value store
 * @param windowSize size of the windows
 * @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
 * @param numSegments the maximum number of segments for rolling the windowed store
 * @param retainDuplicates whether or not to retain duplicate data within the window
 */
PersistentKeyValueFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates);

What kind of caching is implied here?

/**
* Caching should be enabled on the created store.
*/
PersistentKeyValueFactory<K, V> enableCaching();

Solution

  • I can confidently answer 2/3 of these questions:

    How that mentioned changelog would be represented?

    The changelog is a topic is named $applicationId-$storename-changelog. It is a plain key-value topic where keys are the table keys and values are the table values. This topic is created and managed by Kafka Streams. If you do disableLogging, to my knowledge the store will not be restorable if it is somehow lost without replaying your whole topology (if it is replayable!)

    What kind of caching is implied here?

    LRU memory caching before the underlying RocksDB instance is accessed. See CachedStateStore and CachedKeyValueStore specifically, CachedKeyValueStore#getInternal() for example.

    With regard to:

    How these 4 values affect each other? Each window has max number of elements - windowSize? Once it is reached new window started? And each window could be divided up to numSegments files at disk for RocksDB? Duplicate means same both key and value and it is detected only within the same window?

    I haven't looked at these internals recently enough to remember exactly. I can say the following though:

    • Each window does not have a maximum number of elements unless you are using an in-memory LRU store. Windows exist on a time basis, so your entries fall into a window or multiple windows based on time, not window capacity (normally there is no fixed capacity). Update: An important thing to note is that if you are using a cached store, it will only be flushed to disk periodically, at the interval specified by offset commit interval. If such a cached store is backing a KTable, the KTable only forwards messages to its children when the topology commits and the store flushes.
    • Yes, I believe each window is divided into segments on disk. I haven't looked at the code recently enough to remember exactly, and I could be wrong. See RocksDBSegmentedBytesStore and its dependency Segments.
    • Not sure about duplicates in this context.