Search code examples
apache-flinkflink-streamingamazon-eks

Flink production session cluster in EKS instance failure and recovery


I am newbie in Flink, planning to deploy Flink session cluster on EKS with 1 job manager and 5 task managers (each task managers with 4 slots). Different jobs will be submitted through UI for different usecase.

Let's say I have submitted a stateful job (job has simple counter logic using RichFlatMapFunction) backed by RocksDBStateBackend with S3 checkpointDataUri and DbStoragePath pointed to local file path and this job utilises 8 slot totally which is spreaded across two task managers and running fine without any issues for a day. Now following are my question,

1) My understanding about checkpointDataUri and DbStoragePath in RocksDBStateBackend is, checkpointDataUri stores the processed offset information in S3 (since I configured the checkpointDataUri with S3 prefix) and DbStoragePath contains all the state information which is used in RichFlatMapFunction. So all the stateful information are stored in checkpointDataUri which is available in local only. Please correct me If it is wrong.

2) Lets say my Ec2 instance was restarted (the one where the 4 slots was utilised) for some reason and it took around 30 minutes to come online, in this case, EKS will make the new Ec2 instance as TaskManager to match the replicas, however whether Flink job manager will try to reschedule the 4 slots to a different task manager now? If yes, how the state which was stored in Ec2 local instance has to be recovered?

3) Is there is any document/video for Flink EKS failure recovery related things. I saw the official documentation which specifies how to deploy Flink session cluster in EKS. But I don't find anything related to failure recovery in EKS mode. Could someone please point me in the right direction on this?


Solution

  • All of the state you are concerned about, namely the processed offsets and the state used in the RichFlatMapFunction (and any other state Flink is managing for your job) is stored both on the local disk (DbStoragePath) and in S3 (checkpointDataUri).

    Flink always keeps a working copy of all of the state local to each task manager (for high throughput and low latency), and in the background makes complete copies of this state to a distributed file system (like S3) for reliability.

    In other words, what you said in point (1) of your question was incorrect. And the answer to point (2) is that the state to be recovered can always be recovered from S3 if it's not available locally. As for point (3), there's nothing special about failure recovery on EKS compared to any other Flink deployment model.