Search code examples
apache-kafkaapache-kafka-streams

Kafka Streams: RocksDBConfigSetter parameterization


If we have a class implementing the RocksDBConfigSetter, how to pass the parameter values (for e.g. blockCache, blockSize, writeBufferSize, maxWriteBufer) to that class.

streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MyRocksDBConfig.class);

Also, is there any difference between setting the withCachingDisabled() on the storeBuilder versus withCachingEnabled() but setting the CACHE_MAX_BYTES_BUFFERING_CONFIG to 0?


Solution

  • RocksDBConfigSetter is described in the docs: https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter

    public static class CustomRocksDBConfig implements RocksDBConfigSetter {
    
       @Override
       public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
         BlockBasedTableConfig tableConfig = new org.rocksdb.BlockBasedTableConfig();
         tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
         tableConfig.setBlockSize(16 * 1024L);
         tableConfig.setCacheIndexAndFilterBlocks(true);
         options.setTableFormatConfig(tableConfig);
         options.setMaxWriteBufferNumber(2);
       }
    }
    

    Also, is there any difference between setting the withCachingDisabled() on the storeBuilder versus withCachingEnabled() but setting the CACHE_MAX_BYTES_BUFFERING_CONFIG to 0?

    As of Kafka version 2.0 (and older versions), yes. If caching is enabled, the caching layer code will be executed, while with caching disabled the caching layer code will be stripped out and thus not being executed. If you set the buffer size to zero with caching enabled, this only triggers an immediate eviction from the cache. Thus, from a practical point of view, there is no much difference though.

    Note, that this will be fixed in next release: https://issues.apache.org/jira/browse/KAFKA-6998