In Flink, I have a Job with a Keyed Stream of events (e.g.: 10 events for each Key every day on average). They are handled as Sliding Windows based on Event-Time (e.g.: 90-days window size and 1-day window slide). Events are consumed from Kafka, which persists all event history (e.g.: last 3 years).
Sometimes I'd like to restart Flink: for maintenance, bug handling, etc. Or start a fresh Flink instance with Kafka already containing event history.
In such case I would like to skip triggering for all but the most recent window for each Key. (It's specific to my use-case: each window, when processed, effectively overrides processing results from previous windows. So at startup, I would like to process only single most recent window for each Key.)
Is it possible in Flink? If so, then how to do it?
You can use
FlinkKafkaConsumer<T> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
which is described along with other related functions in the section of the docs on Kafka Consumers Start Position Configuration.
Or you could use a savepoint to do a clean upgrade/redeploy without losing your kafka offsets and associated window contents.