Search code examples
apache-kafkaapache-kafka-streams

Missing data in a materialized state store directory


My application does an aggregate on a GroupedKTable, which is then materialized to a PersistentKeyValueStore

I configured state.dir to a permanent path (not default /tmp/kstreams)

I can see the key-values stored as events in the changelog-topic, and also my application succeeds to get them from the state store.

But when I go inspecting state.dir directory, I find only metadata there. No data (.sst files especially) are there (even if I am specifying a Persistent store)

Why is that so ? Where can I find the data ? It seems to me as a behavior of an InMemoryStore

The materialized looks like:

val StoreSupplier = Stores.persistentTimestampedKeyValueStore(name)
Materialized.as(StoreSupplier)(KeySerde, ValueSerde)

which then I pass to the aggregate.

Note: On local machine, with a TopologyTestDriver, I can see .sst files. My question applies to a real deployment of the application


Solution

  • Update

    The initial answer I provided (below) is wrong.

    The answer to the question is that Kafka-streams doesn't flush state stores at each commit (since 2.7.0).

    There is an additional condition that should be satisfied so that the flush is triggered. The condition is that the delta between offsets in the changelog topic and the checkpoint file exceeds a threshold (currently 10.000 records !). When that is satisfied, both the new checkpoint is persisted and the flush is triggered.

    For additional information check this jira https://issues.apache.org/jira/browse/KAFKA-9450

    And source code in AbstractTask.java

    if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, offsetSnapshotSinceLastFlush, offsetSnapshot)) {
                // the state's current offset would be used to checkpoint
                stateMgr.flush();
                stateMgr.checkpoint();
                offsetSnapshotSinceLastFlush = new HashMap<>(offsetSnapshot);
    }
    

    Recall that the checkpoint file stores the offset from changelog topic such that all the records before that offset are persisted on disk in the state store.

    Initial answer

    This is actually a RocksDB detail.

    When kafka-streams flushes data to RocksDB (either when commit.interval.ms is elapsed, or the cache is full), RocksDB stores data in an active memtable, as well as in a transaction log (which in kafka-streams is configured to be the changelog-topic).

    Only when the memtable is full, which is configured to be 16MB in kafka-streams, the memtable becomes read-only, and at this moment it can be written to .sst files

    As long as the size of store is less than memtable size, one cannot see .sst files in state.dir

    I find the following article interesting to understand RocksDB details: https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/