Search code examples
eventsapache-flinkttlstateful

Apache Flink 1.6.0 - StateTtlConfig and ListState


I am in the process of implementing a proof-of-concept stream processing system using Apache Flink 1.6.0 and am storing a list of received events, partitioned by key, in a ListState. (Don't worry about why I am doing this, just work with me here.) I have a StateTtlConfig set on the corresponding ListStateDescriptor. Per the documentation:

  1. "All state collection types support per-entry TTLs. This means that list elements and map entries expire independently."
  2. "Currently, expired values are only removed when they are read out explicitly, e.g. by calling ValueState.value()."

Question 1

Which of the following constitutes a read of the ListState:

  1. Requesting the iterator but not using it - myListState.get();.
  2. Actually using the iterator - for (MyItem i : myListState.get()) { ... }

Question 2

What does "per-entry TTL" actually mean? Specifically, what I'm asking about is the following:

Assume I have a specific instance of ListState<Character>. The descriptor has a TTL of 10 seconds. I insert a 'a'. Two seconds later, I insert 'b'. Nine seconds later I insert 'c'. If I iterate over this ListState, which items will be returned?

In other words:

ListState<Character> ls = getRuntimeContext().getListState(myDescriptor);

ls.add('a');

// ...two seconds later...
ls.add('b');

// ...nine seconds later...
ls.add('c');

// Does this iterate over 'a', 'b', 'c'
// or just 'b' and 'c'?
for (Character myChar : ls.get()) { ... }

Solution

  • Answer 1

    The answer is 1. For ListState the pruning is done for myListState.get();.

    Answer 2

    "per-entry TTL" means the timeout is applied to a single entry rather than whole collection. For your example assuming at the point of reading 10 seconds passed since inserting the a it will iterate over b and c. a is going to be pruned in this case.