Search code examples
apache-flink

Apache Flink - how to skip all but most recent window on startup


In Flink, I have a Job with a Keyed Stream of events (e.g.: 10 events for each Key every day on average). They are handled as Sliding Windows based on Event-Time (e.g.: 90-days window size and 1-day window slide). Events are consumed from Kafka, which persists all event history (e.g.: last 3 years).

Sometimes I'd like to restart Flink: for maintenance, bug handling, etc. Or start a fresh Flink instance with Kafka already containing event history.

In such case I would like to skip triggering for all but the most recent window for each Key. (It's specific to my use-case: each window, when processed, effectively overrides processing results from previous windows. So at startup, I would like to process only single most recent window for each Key.)

Is it possible in Flink? If so, then how to do it?


Solution

  • You can use

    FlinkKafkaConsumer<T> myConsumer = new FlinkKafkaConsumer<>(...);
    myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
    

    which is described along with other related functions in the section of the docs on Kafka Consumers Start Position Configuration.

    Or you could use a savepoint to do a clean upgrade/redeploy without losing your kafka offsets and associated window contents.