We are using Flink to run a number of streaming jobs that read from Kafka, perform some SQL transformation and write the output to Kafka. It runs on Kubernetes with two jobmanagers and many taskmanagers. Our jobs use checkpointing with RocksDB and our checkpoints are written on a bucket in AWS S3.
Recently, we upgraded from Flink 1.13.1 to Flink 1.15.2. We used the savepoint mechanism to stop our jobs and restart them on the new version. We have two Kubernetes clusters. Right after the migration, everything seemed fine for both of them. But a few days (almost a month for the first cluster, 2 or 3 days for the second one) we now have other problems (which may or may not be related to the migration to Flink 1.15 as they happened later).
Recently, we noticed that a few jobs failed to start. We see that the "Source" tasks in the execution graph stay CREATED while all others down in the graph (ChangelogNormalize, Writer) are RUNNING. The jobs restart regularly with the error (stacktrace simplified for readability):
java.lang.Exception: Cannot deploy task Source: source_consumer -> *anonymous_datastream_source$81*[211] (1/8) (de8f109e944dfa92d35cdc3f79f41e6f) - TaskManager (<address>) not responding after a rpcTimeout of 10000 ms
at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:602)
...
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [akka.tcp://flink@<address>/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:580)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@<address>/user/rpc/taskmanager_0#1723317240]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
We also noticed that message in the jobmanagers:
Discarding oversized payload sent to Actor[akka.tcp://flink@<address>/user/rpc/taskmanager_0#1153219611]: max allowed size 10485760b bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 83938405 bytes.
It is not clear why such big Akka messages are sent. But when setting akka.framesize
to a higher value (100MB), the timeout indeed disappears. And the task that were stuck in CREATED
are now INITIALIZING
.
However, the job then stays INITIALIZING
for a very long time. Sometimes they do start, sometimes they fail with the error:
java.lang.OutOfMemoryError: Java heap space
Increasing the memory of the taskmanager helped for some jobs but not all. Overall, they seem to require a lot more memory and take a very long time to initialize. Sometimes we have a connection reset from S3:
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
...
Caused by: java.lang.IllegalStateException: Connection pool shut down
New observations (08/02/2023): we discovered that the problematic jobs have a very large _metadata
file in their checkpoint (168MB for the largest). Worse, it seems to double in size every time the job is resumed from its checkpoint (when the first checkpoint is performed after the restart, then the following checkpoints stay constant).
Thought we did not understand everything, we found where the problem came from and managed to fix it.
TL;DR: The topic-partition-offset-states
key was kept in the job state (checkpoints) when we switched from FlinkKafkaConsumer to KafkaSource. Though it wasn't used anymore, it grew exponentially, so we removed it from the checkpoints (using custom Java code) and restarted everything.
In details:
setStartingOffsets(OffsetsInitializer.committedOffsets())
when resuming the job from the savepoint (as explained in https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer). That did work and our jobs resumed correctly in Flink 1.15 with correct offsets and a state that seemed good.topic-partition-offset-states
key in their state. This was used by FlinkKakfaConsumer, but it is not used by KafkaSource.topic-partition-offset-states
doubled in length sometimes when our jobs were recovered (we use HA on Kubernetes, so this can happen regularly if we restart Flink)._metadata
files in the checkpoints became very big (168MB). This led to Akka timeouts as they exceeded the akka.framesize
. Increasing the framesize helped, but increased the memory pressure, causing many heap memory errors. Besides, it just made the problem worse as the _metadata
kept doubling in size beyond 10MB.completedCheckpoint
files in the high availability storage directory.topic-partition-offset-states
key from the states (making those files much smaller)._metadata
files were back to a reasonable size.