Search code examples
apache-kafkaapache-kafka-streamsktable

Getting Out of Memory exception possibly due to the ktable related state store


We have a kstreams app doing kstream-kstable inner join. Both the topics are high volume with 256 partitions each. kstreams App is deployed on 8 nodes with 8 GB heap each right now. We see that the heap memory keeps constantly growing and eventually OOM happens. I am not able to get the heap dump as its running in a container which gets killed when that happens. But, I have tried a few things to gain confidence that it is related to the state stores/ktable related stuff. Without the below RocksDBConfigSetter the memory gets used up pretty quick, but with the below it is slowed down to some extent. Need some guidance to proceed further , thanks

I added below 3 properties,

properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1 * 1024L);
           properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
           properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);


public static class CustomRocksDBConfig implements RocksDBConfigSetter {

       
        private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(1 * 1024L * 1024L);

        @Override
        public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
            log.info("In CustomRocksDBConfig");

            BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
            tableConfig.setBlockCache(cache);

            tableConfig.setBlockSize(1 * 1024L);

            tableConfig.setCacheIndexAndFilterBlocks(true);
            options.setTableFormatConfig(tableConfig);

            options.setMaxWriteBufferNumber(2);
        }

        @Override
        public void close(final String storeName, final Options options) {

            cache.close();
        } 

Solution

  • You could try to limit the memory usage of RocksDB across all RocksDB instances on one node. To do so you must configure RocksDB to cache the index and filter blocks in the block cache, limit the memtable memory through a shared WriteBufferManager and count its memory against the block cache, and then pass the same Cache object to each instance. You can find more details and a sample configuration under

    https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html#rocksdb

    With such a setup you can specify a soft upper bound for the total heap used by all RocksDB state stores on one single instance (TOTAL_OFF_HEAP_MEMORY in the sample configuration) and then specify how much of that heap is used for writing to and reading from the state stores on one single node (TOTAL_MEMTABLE_MEMORY and INDEX_FILTER_BLOCK_RATIO in the sample configuration, respectively).

    Since all values are app and workload specific you need to experiment with them and monitor the RocksDB state stores with the metrics provided by Kafka Streams.

    Guidance how to handle RocksDB issues in Kafka Streams can be found under:

    https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/

    Especially for your case, the following section might be interesting:

    https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/#high-memory-usage