Search code examples
apache-flinkflink-streaming

How Does Flink Clean Up Keyed State?


When thinking about the act of keying by something I traditionally think of the analogy of throwing all the events that match the key into the same bucket. As you can imagine, when the Flink application starts handling lots of data what you opt to key by starts to become important because you want to make sure you clean up state well. This leads me to my question, how exactly does Flink clean up these "buckets"? If the bucket is empty (all the MapStates and ValueStates are empty) does Flink close that area of the key space and delete the bucket?

Example:

Incoming Data Format: {userId, computerId, amountOfTimeLoggedOn}

Key: UserId/ComputerId

Current Key Space:

  • Alice, Computer 10: Has 2 events in it. Both events are stored in state.
  • Bob, Computer 11: Has no events in it. Nothing is stored in state.

Will Flink come and remove Bob, Computer 11 from the Key Space eventually or does it just live on forever because at one point it had an event in it?


Solution

  • Flink does not store any data for state keys which do not have any user value associated with them, at least in the existing state backends: Heap (in memory) or RocksDB.

    The Key Space is virtual in Flink, Flink does not make any assumptions about which concrete keys can potentially exist. There are no any pre-allocated buckets per key or subset of keys. Only once user application writes some value for some key, it occupies storage.

    The general idea is that all records with the same key are processed on the same machine (somewhat like being in the same bucket as you say). The local state for a certain key is also always kept on the same machine (if stored at all). This is not related to checkpoints though.

    For your example, if some value was written for [Bob, Computer 11] at some point of time and then subsequently removed, Flink will remove it completely with the key.