Search code examples
apache-kafkaapache-flinkflink-streaming

Flink unable to restore operator state for a kafka consumer when starting from checkpoint


We have a Streaming Job that has 20 separate pipelines, with each pipeline having one/many Kafka topic sources.

We are noticing strange behavior in restoring Operatorstate from the checkpoint when we restart the job with a new jar (I have added one more pipeline) and AllowNonRestoredState=true.

  • our current config for adding pipeline is static and we are basically making code changes to add any new pipeline.
  • we have not set any UID for any of the Operators.
  • when we restart the pipeline with the same jar from the checkpoint we see that it restores all the operator states without any Missing state.
  • we maintain parallelism of 1 for all the operators.
  • some of the source operators for Kafka are not able to restore their Operator state with log org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - Consumer subtask 0 has no restore state. due to this Kafka consumer offset is being set to EARLIEST for all the partitions.

Flink Version:1.13.0

Is this a known issue in Flink?

Note- In some cases, we have the same topic in two pipelines with a different group. As Per My Understanding, this should not impact any state restoration since Kafka Union state is on each Kafka consumer source for a pipeline.


Solution

  • Since you haven't explicitly provided UIDs for the operators, you are relying on automatically generated UIDs. These are only stable so long as the job's topology remains unchanged. When you add a new pipeline, this may change some or all of the previously auto-generated UIDs, and make that state unrestorable.

    If you want to ensure you will be able to restore state, set UIDs on all stateful operators. See Flink's Production Readiness Checklist for details.

    If you want to explicitly set UIDs that match the currently auto-generated ones so you can safely evolve the job, you can find the hashed UIDs for each operator by inspecting the running job via the REST API (the vertexId for each operator is its hashed UID). Then you can use those hashed UIDs in combination with setUidHash() on those same operators in your code. See Matching Operator State in the Flink documentation.