Search code examples
kubernetesapache-flinkflink-streaming

Flink Kubernetes operator | how to pass s3 key/secrets to flink-conf.yaml


I'm using Flink's Kubernetes pod operator (in AWS EKS cluster) to deploy flink job and trying to set up AWS S3 location directory for checkpointing and savepoint. I have to pass s3 access/secret keys to flink-conf.yaml but I don't want to put plain credential strings in the config and commit to git.

apiVersion: v1
kind: Secret
metadata:
  namespace: flink-stage
  name: flink-secrets
type: Opaque
  data:
    # base64 encoded!
    s3.access_key: <key>
    s3.secret_key: <secret>

How to refer the secrets in conf yaml, I've tried using secret ref but it doesn't work

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: nrt-sessionizer
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    s3.access-key:
      valueFrom:
        secretKeyRef:
          key: s3.access_key
          name: flink-secrets
    s3.secret-key:
      valueFrom:
        secretKeyRef:
          key: s3.secret_key
          name: flink-secrets

error: error validating "infra/flink-k8/overlays/prod": error validating data:
 [ValidationError(FlinkDeployment.spec.flinkConfiguration.s3.access-key): invalid type for org.apache.flink.v1beta1.FlinkDeployment.spec.flinkConfiguration: got "map", expected "string",
 ValidationError(FlinkDeployment.spec.flinkConfiguration.s3.secret-key): invalid type for org.apache.flink.v1beta1.FlinkDeployment.spec.flinkConfiguration: got "map", expected "string"]; 
if you choose to ignore these errors, turn validation off with --validate=false

Note: changing flink-conf.yaml contents using InitContainer doesn't work because ConfigMaps are read only.

Please suggest how to pass secrets to flink-conf.yaml.


Solution

  • I have resolved this issue by setting the below config in Deployment yaml: K8 pods are running in EKS with an assumed role. Instead of passing key/secret we can tell Flink to use the below-mentioned class to assume the role provided in the ENV variable.

    NOTE: You can pass different authentication provider classes provided by AWS Auth SDK and avoid passing key/secret in the Yaml.

    fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
    

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: nrt-sessionizer
    spec:
      image: flink:1.17
      flinkVersion: v1_17
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "4"
        fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"