Consider the following example class, instances of which are being stored in a ListState
:
class BusinessObject {
long clientId;
String region;
Instant lastDealDate;
bool isActive;
}
The application requires that this object shouldn't be in the flink state if it has been 1 year since the last deal was made (lastDealDate
) with a particular client and the client is not active i.e. isActive == false
What would be the proper way of going about this and letting flink know of these 2 factors so it removes those entries automatically? Currently, I read all the items in the state, clear the state and then add back the relevant ones, however this will start to take a long time as the number of clients increases and the size of the state grows large. Most of my searches online talk about using time-to-live
and setting it for my state via descriptor
. However, my logic can't rely on processing/event/ingestion time and I need to also check if isActive
is false.
Extra info: the context is not keyed and the backend is RocksDB. The reason a ListState
is used is because all of the relavant state/history as per the above conditions needs to be dumped every day.
Any suggestions?
With the RocksDB state backend, Flink can append to ListState without going through serialization/deserialization, but any read or modification other than an append is expensive because of ser/de.
You'll be better off if you can rework things so that these BusinessObjects are stored in MapState, even if you occasionally have to iterate over the entire map. Each key/value pair in the MapState will be a separate RocksDB entry, and you'll be able to individually create/update/delete them without having to go through ser/de for the entire map (unless you do have to scan it). (For what it's worth, iterating over MapState in RocksDB proceeds through the map in serialized-key-sorted order.)
MapState is only available as keyed (or broadcast) state, so this change would require you to key the stream. Using keyBy does force a network shuffle (and ser/de), so it will be expensive, but not as expensive as using ListState.