Search code examples
apache-kafkaapache-kafka-streams

How to test a WindowStore retention period?


I'm trying to deduplicate incoming kafka messages (i'm polling a datasource that make all the data points of a given day available the next day but at an inconsistent time, so i'm polling every x minutes and I want to deduplicate the data points to have a clean downstream topic containing only the new points).

For that I've built a custom transformer that rely on a store to keep track of which "point" have already been processed. As the data point's datetime is part of the deduplication key, I have an unbounded set of keys, so I cannot rely on a simple KeyValueStore. It's my understanding that a WindowStore would allow me to keep only the keys for a specific retention period (2 days, in my case), so that's what I'm using.

I tried to test the deduplication using kafka-streams-test-utils. The deduplication works well enough, but the windowStore does not seems to "forget" the keys. I tried with a shorter window size and duration (1s), but I'm still not able to have it forget the keys/values that are past the retention period.

Configuration of the store : I expect objects to stay for ~2 seconds in the store

config.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,"1");
...
final StoreBuilder<WindowStore<String, AvroBicycleCount>> deduplicationStoreBuilder = Stores.windowStoreBuilder(
            Stores.persistentWindowStore(deduplicationStore, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
            Serdes.String(),
            StreamUtils.AvroSerde()
);

My transformer logic

@Override
public DataPoint transform(final String dataId, final DataPoint incoming) {
    String key = dataId+"_"+incoming.getDateTime();
    DataPoint previous = windowStore.fetch(key, incoming.getDateTime());
    if(previous != null)
        return null;
    
    windowStore.put(key, incoming, incoming.getDateTime());
    return incoming;
}

The third test fail

inputTopic.pipeInput("a", newDataPoint);
assertEquals(1, outputTopic.readRecordsToList().size(), "When a new data is emitted, it should go through");
    
inputTopic.pipeInput("a", newDataPoint);
assertEquals(0, outputTopic.readRecordsToList().size(), "When the same data is re-emitted, it should not go through");
    
TimeUnit.SECONDS.sleep(10);

inputTopic.pipeInput("a", newDataPoint);
assertEquals(1, outputTopic.readRecordsToList().size(), "When the same data is re-emitted well past the retention period, it should go through");
    

Is there something I'm not understanding correctly about the windowStore's retention ?


Solution

  • A WindowedStore uses so-called segments internally to expire data. Ie, the time-range of your retention-time is split into smaller time-ranges and there is a segment for each time-range to store the corresponding data (internally, a segment maps to a store, ie, a WindowedStore is actually multiple stores internally). If all records in a segment are expired, the whole segment is dropped by deleting the corresponding store (this is more efficient than record-by-record expiration).

    Also, there is a minimum (hard-coded) segment size of 60 seconds and the number of segment is 2 (hardcoded), to avoid too small (and inefficient) segments. Thus, for you case of a 2 day retention time, you get two segment with a time range of 1 day each. Thus, data (at the beginning of a segment) can be up to 3 days old until an old segment is dropped.

    Thus, data is effectively deleted with some delay. You cannot configure the number of segments