Search code examples
azurestorageapache-flinkazure-aks

Azure Flink checkpointing to Azure Storage: No credentials found for account


I have a test Flink app I am trying to running on Azure Kubernetes connected to Azure Storage. In my Flink app I have configured the following configuration:

Configuration cfg = new Configuration();
cfg.setString("fs.azure.account.key.<storage-account.blob.core.windows.net", "<access-key>");
FileSystem.initialize(cfg, null);

I have also enabled checkpointing as follows:

env.enableCheckpointing(10000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("wasbs://<container>@<storage-account>.blob.core.windows.net/checkpoint/");

The storage account has been created on the Azure Portal. I have used the Access Key in the code above.

When I deploy the app to Kubernetes the JobManager runs and creates the checkpoint folder in the Azure Storage container, however, the size of the Block blob data is always 0B. The app also continuously throws this exception.

The fun error I am getting is:

Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException: No credentials found for account <storage-account>.blob.core.windows.net in the configuration, and its container <container> is not accessible using anonymous credentials. Please check if the container exists first. If it is not publicly available, you have to provide account credentials.

org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.StorageException: Public access is not permitted on this storage account

The part that has been scratching my head (apart from the fleas) is the fact that it does create the checkpoint folders and files and continues to create further checkpoints.

This account is not publicly accessible and company policy has restricted enabling public access.

I also tried using the flink-conf.yaml and this was my example:

state:backend: rocksdb
state.checkpoints.dir: wasbs://<container>@<storage-account>.blob.core.windows.net/checkpoint/
fs.azure.account.key.**flinkstorage**.blob.core.windows.net: <access-key>
fs.azure.account.key.<storage-account>.blob.core.windows.net: <access-key>

I tried both account.key options above. I tried with wasb protocol as well. I also tried rotating the access keys on Azure Storage all resulting the same errors.


Solution

  • I eventually got this working by moving all of my checkpointing configurations to the flink-conf.yaml. All reference to checkpointing was removed from my code i.e. the StreamExecutionEnvironment.

    My flink-config.yaml looks like this

    execution.checkpointing.interval: 10s
    execution.checkpoint.mode: EXACTLY_ONCE
    state.backend: rocksdb
    state.checkpoints.dir: wasbs://<container>@<storage-account.blob.core.windows.net/checkpoint/
    
    # azure storage access key
    fs.azure.account.key.psbombb.blob.core.windows.net: <access-key>
    

    Checkpoints are now being written to Azure Storage with the size of the metadata files no longer 0B.

    I deployed my Flink cluster to Kubernetes as follows with Azure Storage plugins enabled:

    ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<cluster-name> -Dkubernetes.namespace=<your-namespace> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-azure-fs-hadoop-1.14.0.jar -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-azure-fs-hadoop-1.14.0.jar
    

    I then deployed the job to the Flink cluster as follows:

    ./bin/flink run --target kubernetes-session -Dkubernetes.namespace=<your-namespace> -Dkubernetes.cluster-id=<cluster-name> ~/path/to/project/<your-jar>.jar
    

    The TaskManager on the WebUI will not show StdOut logs. You'll need to kubectl logs -f <taskmanager-pod-name> -n <your-namespace> to see the job logs.

    Remember to port-forward 8081 if you want to see the Flink WebUI: kubectl port-forward svc/<cluster-name> -n <namespace>

    e.g. http://localhost:8081

    If you're using Minikube and you wish to access the cluster through the Flink LoadBalancer external IP you need to run minikube tunnel

    e.g. http://<external-ip>:8081