Search code examples
kubernetesamazon-s3apache-flinkkindflink-checkpoint

Flink upload checkpoint to AWS S3 ERROR: Forbidden Status Code: 403


I deployed a flink application on a Kind cluster(https://kind.sigs.k8s.io/)(1 master & 2 worker nodes) using a yaml file.

As I want to upload flink checkpoint to a S3 bucket, I manually created testBucket/checkpoints, but I got this error from Job Manager Log

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ***********; S3 Extended Request ID: ***********; Proxy: null)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715) ~[?:?]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697) ~[?:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561) ~[?:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541) ~[?:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456) ~[?:?]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403) ~[?:?]
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1372) ~[?:?]
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1346) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$5(PrestoS3FileSystem.java:667) ~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:664) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:648) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353) ~[?:?]
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760) ~[?:?]
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:165) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:148) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:167) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:64) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1409) ~[flink-dist-1.17.1.jar:1.17.1]
... 7 more

Flink Delpoyment Yaml File

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: sampleDeployment
spec:
  image: sampleImage:*.**
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend: filesystem
    state.checkpoints.dir: s3://testBucket/checkpoints/
    state.backend.fs.checkpointdir: s3://testBucket/checkpoints/
    s3.access-key: *********
    s3.secret-key: *************
    s3.endpoint: https://s3.us-east-1.amazonaws.com
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///***.jar
    entryClass: com.***.***
    parallelism: 1
    upgradeMode: stateless
    state: running

I also added flink-s3-fs-hadoop-1.17.1.jar and flink-s3-fs-presto-1.17.1.jar to plugins folder based on this post(Apache Flink to use S3 for backend state and checkpoints).

As I manually created the S3 bucket, it shouldn't be access-key/secret-key issue. Is there any solution to this? As my AWS account has MFA, could this be a potential reason?


Solution

  • There are multiple reasons on why the S3 bucket sends back a 403 error:

    • There is a bucket policy that rejects some/all operations that your S3 requests.
    • There is an organization wide SCP policy that contains DENY rule that affects the permissions of your credentials.
    • There is an account-level permission boundary that prevents the owner of the IAM credentials from calling S3 APIs.
    • Existence of user-group or user-level DENY policies.

    Remember in IAM, Explicit DENY rule takes the highest precedence over all other ALLOW rule(s).