Search code examples
apache-flinksavepointsflink-table-api

"Cannot map checkpoint/savepoint state for operator" when using fromChangelogStream


I want to use the savepoint mechanism to move existing jobs from one version of Flink to another, by:

  1. Stopping a job with a savepoint
  2. Creating a new job from the savepoint, on the new version.

Until Flink 1.14 I have no problem, but in Flink 1.15.1, it fails. Even when not changing the version and staying in 1.15.1, it fails. I get this error, meaning that it could not map the state from the previous job to the new one because of one operator:

Failed to rollback to checkpoint/savepoint hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c to the new program, because the operator is not available in the new program.

After investigation, the problematic operator corresponds to a ChangelogNormalize operator, that I do not explicitly create. It is generated because I use tableEnv.fromChangelogStream(stream, schema, ChangelogMode.upsert()) (the upsert mode is important, other modes do not fail). The table created is passed to an SQL query using the SQL API, which generates something like:

ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> [my_sql_transformation] -> [my_sink]

In previous versions of Flink this operator was always given the same uid so the state could match when starting from the savepoint. In Flink 1.15.1, a different uid is generated every time. I could not find a reliable way to set that uid manually. The only way I found was by going backwards from the transformation:

dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");

but I would expect to have a better way to do that.

Do you have any idea what I do wrong? Could it be a bug in Flink?


Solution

  • After opening an issue on JIRA, it does seem to be a bug.

    One workaround for now is to set table.exec.legacy-transformation-uids to true.