Search code examples
apache-flinkflink-streaming

Apache Flink: How often is state de/serialized?


How frequently does Flink de/serialise operator state? Per get/update or based on checkpoints? Does the state backend make a difference?

I suspect that in the case of a keyed-stream with a diverse key (millions) and thousands of events per second for each key, the de/serialization might be a big issue. Am I right?


Solution

  • Your assumption is correct. It depends on the state backend.

    Backends that store state on the JVM heap (MemoryStateBackend and FSStateBackend) do not serialize state for regular read/write accesses but keep it as objects on the heap. While this leads to very fast accesses, you are obviously bound to the size of the JVM heap and also might face garbage collection issues. When a checkpoint is taken, the objects are serialized and persisted to enable recovery in case of a failure.

    In contrast, the RocksDBStateBackend stores all state as byte arrays in embedded RocksDB instances. Therefore, it de/serializes the state of a key for every read/write access. You can control "how much" state is serialized by choosing an appropriate state primitive, i.e., ValueState, ListState, MapState, etc.

    For example, ValueState is always de/serialized as a whole, whereas a MapState.get(key) only serializes the key (for the lookup) and deserializes the returned value for the key. Hence, you should use MapState<String, String> instead of ValueState<HashMap<String, String>>. Similar considerations apply for the other state primitives.

    The RocksDBStateBackend checkpoints its state by copying their files to a persistent filesystem. Hence, there is no additional serialization involved when a checkpoint is taken.