Search code examples
apache-kafkastreamapache-kafka-streamsrocksdb

Can I empty the local kafka state store


Currently I have 3 kafka brokers with 150 partitions. I also have 3 consumers that each one is assigned to a group of partitions. Each consumer has its own local state store with rocksdb. This in-memory key-value store is called during grpc calls. During rebalancing (if a consumer disappears) then the data is written in the local store of the other consumers.

If the consumers are running for around 2 weeks it seems that the services are running out of memory. Is there a solution to the local storage growing too much? Can we remove data of partitions that are not needed anymore? Or is there a way to remove the stored data after the consumer is restored?


Solution

  • you can use the cleanUp(); method while starting or shut down Kafka Stream to cleanup state storage.

    cleanUp()

    Do a clean up of the local StateStore by deleting all data with regard to the application ID. May only be called either before this KafkaStreams instance is started in with calling start() method or after the instance is closed by calling close() method.

    KafkaStreams app = new KafkaStreams(builder.build(), props);
    // Delete the application's local state.
    // Note: In real application you'd call `cleanUp()` only under
    // certain conditions.  See tip on `cleanUp()` below.
    app.cleanUp();
    
    app.start();
    

    Note: To avoid the corresponding recovery overhead, you should not call cleanUp() by default but only if you really need to. Otherwise, you wipe out local state and trigger an expensive state restoration. You won't lose data and the program will still be correct, but you may slow down startup significantly (depending on the size of your state)

    In case you are looking to delete from state store during your life cycle of Kafka Stream, you can very well remove from state store after all its just collection of map store in rocks B

    Assume you are using Kafka Stream Processor

    KeyValueStore<String, String> dsStore=(KeyValueStore<String, String>) context.getStateStore("localstorename");
    KeyValueIterator<String, String> iter = this.dsStore.all();
                    while (iter.hasNext()) {
                    KeyValue<String, String> entry = iter.next();
                    dsStore.delete(entry.key);
                    }