Search code examples
apache-flinkflink-streaming

Jobs stuck while trying to restart from a checkpoint


Context

We are using Flink to run a number of streaming jobs that read from Kafka, perform some SQL transformation and write the output to Kafka. It runs on Kubernetes with two jobmanagers and many taskmanagers. Our jobs use checkpointing with RocksDB and our checkpoints are written on a bucket in AWS S3.

Recently, we upgraded from Flink 1.13.1 to Flink 1.15.2. We used the savepoint mechanism to stop our jobs and restart them on the new version. We have two Kubernetes clusters. Right after the migration, everything seemed fine for both of them. But a few days (almost a month for the first cluster, 2 or 3 days for the second one) we now have other problems (which may or may not be related to the migration to Flink 1.15 as they happened later).

Description of the problem

Recently, we noticed that a few jobs failed to start. We see that the "Source" tasks in the execution graph stay CREATED while all others down in the graph (ChangelogNormalize, Writer) are RUNNING. The jobs restart regularly with the error (stacktrace simplified for readability):

java.lang.Exception: Cannot deploy task Source: source_consumer -> *anonymous_datastream_source$81*[211] (1/8) (de8f109e944dfa92d35cdc3f79f41e6f) - TaskManager (<address>) not responding after a rpcTimeout of 10000 ms
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:602)
    ...
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [akka.tcp://flink@<address>/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
    at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:580)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@<address>/user/rpc/taskmanager_0#1723317240]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

We also noticed that message in the jobmanagers:

Discarding oversized payload sent to Actor[akka.tcp://flink@<address>/user/rpc/taskmanager_0#1153219611]: max allowed size 10485760b bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 83938405 bytes.

It is not clear why such big Akka messages are sent. But when setting akka.framesize to a higher value (100MB), the timeout indeed disappears. And the task that were stuck in CREATED are now INITIALIZING.

However, the job then stays INITIALIZING for a very long time. Sometimes they do start, sometimes they fail with the error:

java.lang.OutOfMemoryError: Java heap space

Increasing the memory of the taskmanager helped for some jobs but not all. Overall, they seem to require a lot more memory and take a very long time to initialize. Sometimes we have a connection reset from S3:

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
...
Caused by: java.lang.IllegalStateException: Connection pool shut down

New observations (08/02/2023): we discovered that the problematic jobs have a very large _metadata file in their checkpoint (168MB for the largest). Worse, it seems to double in size every time the job is resumed from its checkpoint (when the first checkpoint is performed after the restart, then the following checkpoints stay constant).

Questions

  • What could cause Akka messages that big when submitting a task?
  • Did something change between Flink 1.13 and Flink 1.15 that could explain those issues?
  • How can we determine what is taking all the heap memory?

Solution

  • Thought we did not understand everything, we found where the problem came from and managed to fix it.

    TL;DR: The topic-partition-offset-states key was kept in the job state (checkpoints) when we switched from FlinkKafkaConsumer to KafkaSource. Though it wasn't used anymore, it grew exponentially, so we removed it from the checkpoints (using custom Java code) and restarted everything.

    In details:

    • We switched from FlinkKafkaConsumer to KafkaSource. We made sure that they offsets were committed and used setStartingOffsets(OffsetsInitializer.committedOffsets()) when resuming the job from the savepoint (as explained in https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer). That did work and our jobs resumed correctly in Flink 1.15 with correct offsets and a state that seemed good.
    • However, it looks like the source operators kept the topic-partition-offset-states key in their state. This was used by FlinkKakfaConsumer, but it is not used by KafkaSource.
    • For some reason (that we could not determine), the offsets in topic-partition-offset-states doubled in length sometimes when our jobs were recovered (we use HA on Kubernetes, so this can happen regularly if we restart Flink).
    • After some time, this list of offsets became so big that our _metadata files in the checkpoints became very big (168MB). This led to Akka timeouts as they exceeded the akka.framesize. Increasing the framesize helped, but increased the memory pressure, causing many heap memory errors. Besides, it just made the problem worse as the _metadata kept doubling in size beyond 10MB.
    • The problem was the same for the completedCheckpoint files in the high availability storage directory.
    • To fix that, we had to:
      • Deserialize the CompletedCheckpoint.
      • Update them, to remove the topic-partition-offset-states key from the states (making those files much smaller).
      • Re-serialize them and replace the original files.
    • Upon restart of the taskmanagers and jobmanagers, the jobs loaded correctly. After they wrote their first checkpoint, the _metadata files were back to a reasonable size.