Search code examples
apache-flink

Flink capped MapState


Background

We want to keep in a Flink operator's state the last n unique id's. When the n+1 unique id arrives, we want to keep it and drop the oldest unique id in the state. This is in order to avoid an ever-growing state.

We already have a TTL (expiration time) mechanism in place. The size limit is another restriction we're looking to put in place.

Not every element holds a unique id.

Question

Does Flink provide an API that limits the number of elements in the state?

Things tried

  1. Using MapState with a StateTtlConfig generated TTL/expiration mechanism.
  2. Window limited the number of processed elements, but not the number of elements in state.

Solution

  • While this isn't directly provided, you could achieve this with MapState<Long, Event> plus a couple of additional ValueState<Long> values to keep track of the currently active range of indexes into the MapState.

    As events arrive, do something roughly like this (but using Flink state rather than this pseudocode):

    map[nextIndex++] = thisEvent;
    if (nextIndex - oldestIndex > n) {
      map[oldestIndex++].clear();
    }