Search code examples
apache-flinkflink-streaming

Flink Fencing Errors in K8 HA mode


I am using Flink 1.12 and trying to keep job manager in HA over Kubernetes cluster (AKS). I am running 2 job manager and 2 task manager pods.

The problem that I am facing is that the task managers are not able to find the jobmanager leader.

The reason being they are trying to hit the K8 "Service" for jobmanager (which is a clusterIP Service) instead of hitting the pod IP of the leader. Hence sometimes the jobmanager Service will resolve the registration call to the standby jobmanager which makes TaskManger to not be able to find the jobmanager leader.

Here are the contents of the jobmanager-leader file

{
    "apiVersion": "v1",
    "data": {
        "address": "akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2",
        "sessionId": "ee14c446-82b0-45ab-b470-ee445ddd0e0f"
    },
    "kind": "ConfigMap",
    "metadata": {
        "annotations": {
            "control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"e6a42a4f-235e-4b97-93c6-40f4b987f56b\",\"leaseDuration\":15.000000000,\"acquireTime\":\"2021-02-16T05:13:37.365000Z\",\"renewTime\":\"2021-02-16T05:22:17.386000Z\",\"leaderTransitions\":105}"
        },
        "creationTimestamp": "2021-02-15T16:13:26Z",
        "labels": {
            "app": "flinktestk8cluster",
            "configmap-type": "high-availability",
            "type": "flink-native-kubernetes"
        },
        "name": "flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader",
        "namespace": "default",
        "resourceVersion": "46202881",
        "selfLink": "/api/v1/namespaces/default/configmaps/flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader",
        "uid": "1d5ca6e3-dc7e-4fb7-9fab-c1bbb956cda9"
    }
}

Here flink-jobmanager is the name of the K8 Service for jobmanager.

Is there a way to fix this? How do I make jobmanager write podIP in the leader file instead of service name?

Here is the exception

2021-02-12 06:15:53,849 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Registration at ResourceManager failed due to an error
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 9 more
2021-02-12 06:15:53,849 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Pausing and re-attempting registration in 10000 ms

Solution

  • The problem is that you want to give your JobManager pods unique addresses when using standby JobManagers. Hence, you must not configure a service which the components use to communicate with each other. Instead you should start your JobManager pods with the pod IP as its jobmanager.rpc.address.

    In order to start each JobManager pod with its IP you must not configure a ConfigMap which contains the Flink configuration, because it would be the same configuration for every JobManager pod. Instead you need to add the following snippet to your JobManager deployment:

    env:
      - name: MY_POD_IP
        valueFrom:
          fieldRef:
            fieldPath: status.podIP
      - name: FLINK_PROPERTIES
        value: |
          jobmanager.rpc.address: ${MY_POD_IP}
    

    That way we tell each JobManager pod to use the pod's IP for the jobmanager.rpc.address which is also written to the K8s HA service. If this is done, then every K8s HA service user which runs inside of the K8s cluster can find the current leader.

    Next you need to configure for all deployments that you want to use the K8s HA service. You can do this via extending the FLINK_PROPERTIES env variable:

    env:
      - name: MY_POD_IP
        valueFrom:
          fieldRef:
            fieldPath: status.podIP
      - name: FLINK_PROPERTIES
        value: |
          jobmanager.rpc.address: ${MY_POD_IP}
          kubernetes.cluster-id: foobar
          high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
          high-availability.storageDir: hdfs:///flink/recovery
          restart-strategy: fixed-delay
          restart-strategy.fixed-delay.attempts: 10
    

    Adding this to your JobManager pod definition and

    env:
      - name: FLINK_PROPERTIES
        value: |
          kubernetes.cluster-id: foobar
          high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
          high-availability.storageDir: hdfs:///flink/recovery
          restart-strategy: fixed-delay
          restart-strategy.fixed-delay.attempts: 10
    

    to your TaskManager deployment should solve the problem.

    The full deployment yamls can be found here

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-jobmanager
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: flink
          component: jobmanager
      template:
        metadata:
          labels:
            app: flink
            component: jobmanager
        spec:
          containers:
          - name: jobmanager
            image: flink:1.12.1
            args: ["jobmanager"]
            env:
              - name: MY_POD_IP
                valueFrom:
                  fieldRef:
                    fieldPath: status.podIP
              - name: FLINK_PROPERTIES
                value: |
                  jobmanager.rpc.address: ${MY_POD_IP}
                  kubernetes.cluster-id: foobar
                  high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
                  high-availability.storageDir: file:///flink/recovery
                  restart-strategy: fixed-delay
                  restart-strategy.fixed-delay.attempts: 10
            ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
            livenessProbe:
              tcpSocket:
                port: 6123
              initialDelaySeconds: 30
              periodSeconds: 60
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-taskmanager
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: flink
          component: taskmanager
      template:
        metadata:
          labels:
            app: flink
            component: taskmanager
        spec:
          containers:
          - name: taskmanager
            image: flink:1.12.1
            args: ["taskmanager"]
            env:
              - name: FLINK_PROPERTIES
                value: "kubernetes.cluster-id: foobar\n
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory\n
    high-availability.storageDir: file:///flink/recovery\n
    restart-strategy: fixed-delay\n
    restart-strategy.fixed-delay.attempts: 10"
            ports:
            - containerPort: 6122
              name: rpc
            - containerPort: 6125
              name: query-state
            - containerPort: 6121
              name: metrics
            livenessProbe:
              tcpSocket:
                port: 6122
              initialDelaySeconds: 30
              periodSeconds: 60
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary