Search code examples
apache-flinkamazon-kinesisamazon-kinesis-analytics

How to stop high load from leading to cascading Flink checkpoint failures


A couple of points i'll volunteer up front:

  1. I'm new to Flink (working with it for about a month now)
  2. I'm using Kinesis Analytics (AWS hosted Flink solution). By all accounts this doesn't really limit the versatility of Flink or the options for fault tolerance, but I'll call it out anyways.

We have a fairly straight forward sliding window application. A keyed stream organizes events by a particular key, IP address for example, and then processes them in a ProcessorFunction. We mostly use this to keep track of counts of things. For example, how many logins for a particular IP address in the last 24 hours. Every 30 seconds we count the events in the window, per key, and save that value to an external data store. State is also updated to reflect the events in that window so that old events expire and aren't taking up memory.

Interestingly enough, cardinality is not an issue. If we have 200k folks logging in, in a 24 hour period, everything is perfect. Things start to get hairy when one IP logs in 200k times in 24 hours. At this point, checkpoints start to take longer and longer. An average checkpoint takes 2-3 seconds, but with this user behaviour, the checkpoints start to take 5 minutes, then 10, then 15, then 30, then 40, etc etc.

The application can run smoothly in this condition for a while, surprisingly. Perhaps 10 or 12 hours. But, sooner or later checkpoints completely fail and then our max iterator age starts to spike, and no new events are processed etc etc.

I've tried a few of things at this point:

  1. Throwing more metal at the problem (auto scaling turned on as well)
  2. Fussing with CheckpointingInterval and MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
  3. Refactoring to reduce the footprint of the state we store

(1) didn't really do much. (2) This appeared to help but then another much larger traffic spike then what we'd seen before squashed any of the benefits (3) It's unclear if this helped. I think our application memory footprint is fairly small compared to what you'd imagine from a Yelp or an Airbnb who both use Flink clusters for massive applications so I can't imagine that my state is really problematic.

I'll say I'm hoping we don't have to deeply change the expectations of the application output. This sliding window is a really valuable piece of data.

EDIT: Somebody asked about what my state looks like it's a ValueState[FooState]

case class FooState(
                         entityType: String,
                         entityID: String,
                         events: List[BarStateEvent],
                         tableName: String,
                         baseFeatureName: String,
                       )

case class BarStateEvent(target: Double, eventID: String, timestamp: Long)

EDIT: I want to highlight something that user David Anderson said in the comments:

One approach sometimes used for implementing sliding windows is to use MapState, where the keys are the timestamps for the slices, and the values are lists of events.

This was essential. For anybody else trying to walk this path, I couldn't find a workable solution that didn't bucket events into some time slice. My final solution involves bucketing events into batches of 30 seconds and then writing those into map state as David suggested. This seems to do the trick. For our high periods of load, checkpoints remain at 3mb and they always finish in under a second.


Solution

  • If you have a sliding window that is 24 hours long, and it slides by 30 seconds, then every login is assigned to each of 2880 separate windows. That's right, Flink's sliding windows make copies. In this case 24 * 60 * 2 copies.

    If you are simply counting login events, then there is no need to actually buffer the login events until the windows close. You can instead use a ReduceFunction to perform incremental aggregation.

    My guess is that you aren't taking advantage of this optimization, and thus when you have a hot key (ip address), then the instance handling that hot key has a disproportionate amount of data, and takes a long time to checkpoint.

    On the other hand, if you are already doing incremental aggregation, and the checkpoints are as problematic as you describe, then it's worth looking more deeply to try to understand why.

    One possible remediation would be to implement your own sliding windows using a ProcessFunction. By doing so you could avoid maintaining 2880 separate windows, and use a more efficient data structure.

    EDIT (based on the updated question):

    I think the issue is this: When using the RocksDB state backend, state lives as serialized bytes. Every state access and update has to go through ser/de. This means that your List[BarStateEvent] is being deserialized and then re-serialized every time you modify it. For an IP address with 200k events in the list, that's going to be very expensive.

    What you should do instead is to use either ListState or MapState. These state types are optimized for RocksDB. The RocksDB state backend can append to ListState without deserializing the list. And with MapState, each key/value pair in the map is a separate RocksDB object, allowing for efficient lookups and modifications.

    One approach sometimes used for implementing sliding windows is to use MapState, where the keys are the timestamps for the slices, and the values are lists of events. There's an example of doing something similar (but with tumbling windows) in the Flink docs.

    Or, if your state can fit into memory, you could use the FsStateBackend. Then all of your state will be objects on the JVM heap, and ser/de will only come into play during checkpointing and recovery.