Search code examples
apache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Check if StateStore is fully populated


I have a compact topic with approx 30Mio Keys. My App materializes this topic to a KeyValueStore.

How can I check if the KeyValueStore is completely populated? If I lookup a key via InteractiveQuery I need to know if the key is not present because the StateStore is not ready yet or if the key is indeed not present.

I materialize the StateStore this way:


  @Bean
  public Consumer<KTable<Key, Value>> process() {
    return stream -> stream.filter((k, v) -> v != null,
        Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
            .withKeySerde(new KeySerde())
            .withValueSerde(new ValueSerde()));
  }

Solution

  • In general, there is no such thing as "fully loaded" because after the application was started at any point in time new data might be written to the input topic and this new data would be read to update the corresponding table.

    What you can do is to monitor consumer lag: within you application KafkaStreams#metrics() allow you to access all client (ie, consumer/producer) and Kafka Streams metrics. The consumer exposes a metric called records-lag-max that may help.

    Of course, during normal processing (assuming that new data is written to the input topic all the time) consumer lag will go up-and-down all the time.