I am in the process of implementing a proof-of-concept stream processing system using Apache Flink 1.6.0 and am storing a list of received events, partitioned by key, in a ListState
. (Don't worry about why I am doing this, just work with me here.) I have a StateTtlConfig
set on the corresponding ListStateDescriptor
. Per the documentation:
ValueState.value()
."Question 1
Which of the following constitutes a read of the ListState
:
myListState.get();
.for (MyItem i : myListState.get()) { ... }
Question 2
What does "per-entry TTL" actually mean? Specifically, what I'm asking about is the following:
Assume I have a specific instance of ListState<Character>
. The descriptor has a TTL of 10 seconds. I insert a 'a'
. Two seconds later, I insert 'b'
. Nine seconds later I insert 'c'
. If I iterate over this ListState
, which items will be returned?
In other words:
ListState<Character> ls = getRuntimeContext().getListState(myDescriptor);
ls.add('a');
// ...two seconds later...
ls.add('b');
// ...nine seconds later...
ls.add('c');
// Does this iterate over 'a', 'b', 'c'
// or just 'b' and 'c'?
for (Character myChar : ls.get()) { ... }
Answer 1
The answer is 1. For ListState the pruning is done for myListState.get();
.
Answer 2
"per-entry TTL" means the timeout is applied to a single entry rather than whole collection. For your example assuming at the point of reading 10 seconds passed since inserting the a it will iterate over b
and c
. a
is going to be pruned in this case.