Search code examples
apache-flink

Handle Checkpoint timeouts in Apache Flink Long Running Event


We have an application where we have a set of rules and members. Whenever a new rule comes as an event, we need to run this rule across all the members present. There are millions of members and this is stored in the Keyed state.

This event takes hours to process and works when checkpoints are not enabled. When checkpoints are enabled it's getting timed out after 10 mins.

One of the solutions I was thinking of was to split the job into two.

  • The first job will take the rule event. Form a Tuple<Rule, Member> and publish it to an intermediate Kafka. This job replicates the rule event for each of the members.

  • In the second job, the actual rule processing will happen.

Is there a better way to handle this scenario?

Edit

Following are the steps that I am doing in the current job.

memberStream.keyBy("key is membership number")
  .connect(ruleStream.broadcast())
  .process(
    // This operator stores the member state when member event comes
    // When rule event comes it forms a Tuple<Rule, Member> for each of the members in the state and forward it to the next operator
  )
  .flatMap(
    // does the rule processing
  )
  .addSink(
    // Write to DB
  );

Solution

  • The reason why the job is failing is that a checkpoint is timing out, which is happening because the operator with the keyed state is running for hours in response to a single event. This prevents the checkpoint barrier from triggering the checkpoint in that operator, and then continuing downstream.

    You've proposed having the operator with the keyed state emit a Tuple<Rule, Member> for each affected Member, and sending those tuples to an another job (via Kafka). An alternative would be to put that rule processing logic in the original job, but in an operator downstream of the operator holding all of the keyed state. Go ahead and emit the tuples, as proposed, but process them in the original job. You'll also need to enable unaligned checkpointing, so that the checkpoints can complete before timing out.

    One potential problem with this alternative is that the checkpoints taken during rule processing will become larger, but if there will only be millions of these tuples, that shouldn't be too painful.

    (Another approach would be to put a flatmap in front of the operator with all of the keyed member state that expands each new rule into 100 new rules, each affecting only 1% of the members. Same basic idea, just turning it around.)