Search code examples
kubernetesapache-flinkkubernetes-helmlinkerd

Flink, Kubernetes, and Linkerd


I am deploying some Flink jobs which require access to some services under a service mesh implemented via Linkerd and I'm running into this error:

java.lang.NoClassDefFoundError: Could not initialize class foo.bar.Job 

I can confirm that the jar file contains the class that cannot be found apparently, so it's not a problem with the jar itself, but seems to be related to Linkerd. In particular, I'm using the following pod annotations for both the jobmanager and the taskmanager pods (taken from my Helm Chart values file):

podAnnotations:
  linkerd.io/inject: enabled
  config.linkerd.io/skip-outbound-ports: 6123,6124
  config.linkerd.io/proxy-await: enabled

For what it's worth, I'm using the Ververica Platform (Community Edition) for deploying my jobs to Kubernetes, although I don't think the issue is VVP-specific:

{{- define "vvp.deployment" }}
kind: Deployment
apiVersion: v1
metadata:
  name: my-job
spec:
  template:
    spec:
      artifact:
        kind: jar
        flinkImageRegistry: {{ .Values.flink.imageRegistry }}
        flinkVersion: "1.15.1"
        flinkImageTag: 1.15.1-stream1-scala_2.12-java11-linkerd
        entryClass: foo.bar.Job
      kubernetes:
        jobManagerPodTemplate:
          metadata:
            {{- with .Values.flink.podAnnotations }}
            annotations:
              {{- toYaml . | nindent 14 }}
            {{- end }}
          spec:
            containers:
              - name: flink-jobmanager
                command:
                  - linkerd-entrypoint.sh
        taskManagerPodTemplate:
          metadata:
            {{- with .Values.flink.podAnnotations }}
            annotations:
              {{- toYaml . | nindent 14 }}
            {{- end }}
{{- end }}

where the contents of linkerd-entrypoint.sh are:

#!/bin/bash
set -e
exec linkerd-await --shutdown -- "$@"

For extra context, the VVP and the flink jobs are deployed into different namespaces. Also, for the VVP pods, I'm not using any linkerd annotations whatsoever.

Has anyone encountered similar problems? The closest troubleshooting resource/guide that I've found so far is this one, which targets Istio instead of Linkerd.


Solution

  • Answering to myself after having determined the root cause of the issue.

    Regarding Linkerd, everything was correctly setup. The main precaution that one needs to take is adding the linkerd-await binary to the Flink image and making sure to override the entrypoint for the jobmanager since otherwise you will run into issues when upgrading your jobs. The jobmanager won't kill the Linkerd proxy, and because of that it will hang around with NotReady status. Again, that is easily solved by wrapping the main cmd in a linkerd-await call. So, first add the linkerd-await binary to your docker image:

    # Add linkerd-await and linkerd-entrypoint.sh
    USER root
    RUN apt-get update && apt-get install -y wget
    RUN wget https://github.com/linkerd/linkerd-await/releases/download/release%2Fv0.2.7/linkerd-await-v0.2.7-amd64 -O ./linkerd-await && chmod +x ./linkerd-await
    COPY scripts/flink/linkerd-entrypoint.sh ./linkerd-entrypoint.sh
    

    Then, for the jobmanager only, override the entrypoint like this:

    spec:
      containers:
        - name: flink-jobmanager
          command:
           - linkerd-entrypoint.sh # defined above
    

    Alternatively one could use the LINKERD_DISABLED or LINKERD_AWAIT_DISABLED env vars for bypassing the linkerd-await wrapper. For more info on using jobs & Linkerd consult the following resources:

    Also, regarding the annotation

    config.linkerd.io/proxy-await: enabled
    

    , it does only the waiting but not the shutdown part, so if we are going to manually run linkerd-await --shutdown -- "$@" anyway, that annotation can be safely removed since it's redundant:

    Finally, regarding:

    java.lang.NoClassDefFoundError: Could not initialize class foo.bar.Job 
    

    let me clarify that this had nothing to do with Linkerd. This was mostly a config error along the lines of:

    Essentially (the specific details are irrelevant), there were some env vars missing in the taskmanager pods. Note that the exception message says "Could not initialize class foo.bar.Job" which is different from "Could not find class...".

    Sorry for the confusion!