Search code examples
joinstreamingapache-flinkflink-streaming

How to set a TTL for all items in a MapState in Flink?


I would like to clean up all entries in a MapState at a given timestamp.

I am considering two ways to do:

  1. Hold the cleanup timestamp in a ValueState, Register a timer for the cleanup timestamp, When the timer fires clear the MapState. Though the cleanup timestamp might be the same, this would happen for every item added to the MapState. I am relying on Flink de-duping the timers.
  2. Calculate the TTL based on (cleanup timestamp - current timestamp), use StateTtlConfig to set a TTL for the MapState

Which is a better approach (performance, accuracy etc.)? Does StateTtlConfig work for even time processing?


Solution

  • If your intention is to clear out all entries in the MapState at the same time, then I would not use StateTtlConfig, since Flink will spend 8 bytes to store a timer with each map entry. This is a lot of unnecessary storage overhead.

    With StateTtlConfig, the state expiry can only be specified in terms of processing time.

    Also keep in mind that StateTtlConfig cannot be added to, or removed from, an existing state descriptor.