Search code examples
apache-flinkflink-streamingflink-cep

How to configure Flink to use S3 for backend state and checkpoints


I have a setup with Flink v1.2, 3 JobManagers, 2 TaskManagers. I want to use an S3 bucket instead of hdfs for backend state and checkpoints and zookeeper storageDir

fs.s3.accessKey: [accessKey]
fs.s3.secretKey: [secretKey]

state.backend: filesystem

state.backend.fs.checkpointdir: s3:///[bucket]/flink-checkpoints
state.checkpoints.dir: s3:///[bucket]/external-checkpoints
high-availability: zookeeper
high-availability.zookeeper.storageDir: s3:///[bucket]/recovery

In the JobManager I log I have

2017-03-22 09:52:40,971 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,981 WARN  org.apache.hadoop.security.UserGroupInformation               - PriviledgedActionException as:ubuntu (auth:SIMPLE) cause:java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
2017-03-22 09:52:40,981 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,983 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2017-03-22 09:52:40,993 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.

I have no hadoop installed. Not sure if this is needed and if it is how / where should it be installed / configured?

Edit: After configuring Flink with following hadoop xml (core-site.xml) I did not really understand the IAM part and I'm not using EMR, I installed the cluster myself (in AWS) to be able to update Flink without depending on the image:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>s3://[ bucket ] </value>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <description>[ Access Key ]</description>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <description>[ Secret Key ]</description>
    </property>

    <property>
        <name>fs.s3.awsAccessKeyId</name>
        <description>[ Access Key ]</description>
    </property>

    <property>
        <name>fs.s3.awsSecretAccessKey</name>
        <description>[ Secret Key ]</description>
    </property>

    <property>
        <name>fs.s3n.awsAccessKeyId</name>
        <value>[ Access Key ]</value>
    </property>

    <property>
        <name>fs.s3n.awsSecretAccessKey</name>
        <value>[ Secret Key ]</value>
    </property>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3.buffer.dir</name>
        <value>/tmp</value>
    </property>
</configuration>

I get this error:

  2017-03-24 11:20:17,760 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
        at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
        at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
        at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:303)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

Edit: My mistake I set up the key in the description field instead of value.


Solution

  • Please check the guide on running Flink with S3 on how to set up S3.

    I think what you are missing is the hadoop configuration file with the fs.s3.impl configuration key. Even though you are not using Hadoop, you still need to use the Hadoop configuration file.