Search code examples
javastateapache-flinkflink-streaming

Pruning flink state based on attributes of stored object


Consider the following example class, instances of which are being stored in a ListState:

class BusinessObject {
  long clientId;
  String region;
  Instant lastDealDate; 
  bool isActive;
}

The application requires that this object shouldn't be in the flink state if it has been 1 year since the last deal was made (lastDealDate) with a particular client and the client is not active i.e. isActive == false

What would be the proper way of going about this and letting flink know of these 2 factors so it removes those entries automatically? Currently, I read all the items in the state, clear the state and then add back the relevant ones, however this will start to take a long time as the number of clients increases and the size of the state grows large. Most of my searches online talk about using time-to-live and setting it for my state via descriptor. However, my logic can't rely on processing/event/ingestion time and I need to also check if isActive is false.

Extra info: the context is not keyed and the backend is RocksDB. The reason a ListState is used is because all of the relavant state/history as per the above conditions needs to be dumped every day.

Any suggestions?


Solution

  • With the RocksDB state backend, Flink can append to ListState without going through serialization/deserialization, but any read or modification other than an append is expensive because of ser/de.

    You'll be better off if you can rework things so that these BusinessObjects are stored in MapState, even if you occasionally have to iterate over the entire map. Each key/value pair in the MapState will be a separate RocksDB entry, and you'll be able to individually create/update/delete them without having to go through ser/de for the entire map (unless you do have to scan it). (For what it's worth, iterating over MapState in RocksDB proceeds through the map in serialized-key-sorted order.)

    MapState is only available as keyed (or broadcast) state, so this change would require you to key the stream. Using keyBy does force a network shuffle (and ser/de), so it will be expensive, but not as expensive as using ListState.