I am using Kafka Streams of version 0.11.0.2.
In order to leverate transform
API I create my own StateStoreSupplier
using Stores.create
builder method. The problem is that javadoc for some field/methods to me is not clear enough.
val storeSupplier = Stores.create(STORE_NAME)
.withStringKeys()
.withStringValues()
.persistent()
.disableLogging()
.windowed(WINDOW_SIZE, RETENTION, 3, false)
.enableCaching()
.build()
How that mentioned changelog would be represented?
/**
* Indicates that a changelog should not be created for the key-value store
*/
PersistentKeyValueFactory<K, V> disableLogging();
How these 4 values affect each other? Each window has max number of elements - windowSize
? Once it is reached new window started? And each window could be divided up to numSegments
files at disk for RocksDB? Duplicate means same both key and value and it is detected only within the same window?
/**
* Set the persistent store as a windowed key-value store
* @param windowSize size of the windows
* @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
* @param numSegments the maximum number of segments for rolling the windowed store
* @param retainDuplicates whether or not to retain duplicate data within the window
*/
PersistentKeyValueFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates);
What kind of caching is implied here?
/**
* Caching should be enabled on the created store.
*/
PersistentKeyValueFactory<K, V> enableCaching();
I can confidently answer 2/3 of these questions:
How that mentioned changelog would be represented?
The changelog is a topic is named $applicationId-$storename-changelog
. It is a plain key-value topic where keys are the table keys and values are the table values. This topic is created and managed by Kafka Streams. If you do disableLogging
, to my knowledge the store will not be restorable if it is somehow lost without replaying your whole topology (if it is replayable!)
What kind of caching is implied here?
LRU memory caching before the underlying RocksDB instance is accessed. See CachedStateStore
and CachedKeyValueStore
specifically, CachedKeyValueStore#getInternal()
for example.
With regard to:
How these 4 values affect each other? Each window has max number of elements - windowSize? Once it is reached new window started? And each window could be divided up to numSegments files at disk for RocksDB? Duplicate means same both key and value and it is detected only within the same window?
I haven't looked at these internals recently enough to remember exactly. I can say the following though:
KTable
, the KTable
only forwards messages to its children when the topology commits and the store flushes.RocksDBSegmentedBytesStore
and its dependency Segments
.