Search code examples
javakubernetesakkaapache-flinkflink-batch

Flink: TaskManager cannot connect to the JobManager - Could not resolve ResourceManager address


I'm using the Apache Flink Kubernetes operator to deploy a standalone job on an Application cluster setup.

I have setup the following files using the Flink official documentation - Link

  1. jobmanager-application-non-ha.yaml
  2. taskmanager-job-deployment.yaml
  3. flink-configuration-configmap.yaml
  4. jobmanager-service.yaml

I have not changed any of the configurations in these files and am trying to run a simple WordCount example from the Flink examples using the Apache Flink Operator.

After running the kubectl commands to setting up the job manager and the task manager - the job manager goes into a NotReady state while the task manager goes into a CrashLoopBackOff loop.

NAME                                         READY   STATUS             RESTARTS        AGE
flink-jobmanager-28k4b                       1/2     NotReady           2 (4m24s ago)   16m
flink-kubernetes-operator-6585dddd97-9hjp4   2/2     Running            0               10d
flink-taskmanager-6bb88468d7-ggx8t           1/2     CrashLoopBackOff   9 (2m21s ago)   15m

The job manager logs look like this

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
    at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86) ~[flink-dist-1.16.0.jar:1.16.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[flink-rpc-akka_be40712e-8b2e-47cd-baaf-f0149cf2604d.jar:1.16.0]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_be40712e-8b2e-47cd-baaf-f0149cf2604d.jar:1.16.0]

The Task manager it seems cannot connect to the job manager

2023-01-28 19:21:47,647 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to ResourceManager akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000).
2023-01-28 19:21:57,766 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 10000 ms: Could not connect to rpc endpoint under address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
2023-01-28 19:22:08,036 INFO  akka.remote.transport.ProtocolStateActor                     [] - No response from remote for outbound association. Associate timed out after [20000 ms].
2023-01-28 19:22:08,057 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flink-jobmanager:6123]] Caused by: [No response from remote for outbound association. Associate timed out after [20000 ms].]
2023-01-28 19:22:08,069 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 10000 ms: Could not connect to rpc endpoint under address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
2023-01-28 19:22:08,308 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with org.jboss.netty.channel.ConnectTimeoutException: connection timed out: flink-jobmanager/100.127.18.9:6123

The flink-configuration-configmap.yaml looks like this

  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2    

This is what the pom.xml looks like - Link


Solution

  • You deployed the Kubernetes Operator in the namespace, but you did not create the CRDs the Operator requires. Instead you tried to create a standalone Flink Kubernetes cluster.

    The Flink Operator makes it a lot easier to deploy your Flink jobs, you only need to deploy the operator itself and FlinkDeployment/FlinkSessionJob CRDs. The operator will manage your deployment after.

    Please use this documentation for the Kubernetes Operator: Link