Search code examples
amazon-web-serviceshadoopamazon-s3amazon-emr

AWS EMR throws exception on Accelerated Endpoint configuration


Here is the EMR step I used,

s3-dist-cp --targetSize 1000 --outputCodec=gz --s3Endpoint=bucket.s3-accelerate.amazonaws.com --groupBy './(\d\d)/\d\d/\d\d/.' --src s3a://sourcebucket/ --dest s3a://destbucket/

Exception with the accelerated endpoint.

EMR version:

Release label:emr-5.13.0
Hadoop distribution:Amazon 2.8.3
Applications:Hive 2.3.2, Pig 0.17.0, Hue 4.1.0, Presto 0.194

What am I missing to pass in parameter for s3-dist-cp to overcome this error?

Exception in thread "main" com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: To enable accelerate mode, please use AmazonS3ClientBuilder.withAccelerateModeEnabled(true)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3941)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4824)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4830)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.provider.DefaultS3Provider.getS3(DefaultS3Provider.java:55)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.provider.DefaultS3Provider.getS3(DefaultS3Provider.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.getClient(GlobalS3Executor.java:122)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:89)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.doesBucketExist(AmazonS3LiteClient.java:88)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.ensureBucketExists(Jets3tNativeFileSystemStore.java:138)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:116)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.initialize(S3NativeFileSystem.java:448)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:109)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2859)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
    at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:869)
    at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:705)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
    at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
Caused by: java.lang.IllegalStateException: To enable accelerate mode, please use AmazonS3ClientBuilder.withAccelerateModeEnabled(true)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.setEndpoint(AmazonS3Client.java:670)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonWebServiceClient.withEndpoint(AmazonWebServiceClient.java:897)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.provider.DefaultS3Provider$S3CacheLoader.load(DefaultS3Provider.java:62)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.provider.DefaultS3Provider$S3CacheLoader.load(DefaultS3Provider.java:58)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
    ... 30 more
Command exiting with ret '1'

Solution

  • s3-dist-cp is build on hadoop-aws library which is not supporting using accelerated bucket out of the box.

    You would like to make your own jar with dependencies to hadoop-aws and amazon-sdk-s3, translate required parameters there and extend s3ClientFactory to enable accelerated upload.

    Maven dependencies example:

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-s3</artifactId>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-core</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>  
    </dependency>
    

    S3 client factory:

    public class AcceleratedS3ClientFactory extends DefaultS3ClientFactory {
        @Override
        protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials, ClientConfiguration awsConf) {
            AmazonS3ClientBuilder s3Builder = AmazonS3ClientBuilder
                    .standard()
                    .withRegion("s3-accelerate.amazonaws.com")
                    .enableAccelerateMode();
            s3Builder.setCredentials(credentials);
            s3Builder.setClientConfiguration(awsConf);
    
            return s3Builder.build();
        }
    
        @Override
        public AmazonS3 createS3Client(URI name) throws IOException {
            AmazonS3 s3 = super.createS3Client(name);
            // load below bucket name from step configuration as well
            s3.setBucketAccelerateConfiguration("bucket-name",
                    new BucketAccelerateConfiguration(BucketAccelerateStatus.Enabled));
    
            return s3;
        }
    }
    

    The last step is to provide hadoop with your s3 factory class:

    <property>
      <name>fs.s3a.s3.client.factory.impl</name>
      <value>example_package.AcceleratedS3ClientFactory</value>
    </property>
    

    This also can be done from command line, so you can specify it in EMR interface or EMR SDK directly.

    For copy itself, can be used Hadoop FileUtil.copy API, you can specify source and destination there, as well as required configuration.

    For some specific file formats, or not FS based source or destination, Spark can be considered on above this utility. It could make a transfer faster for some cases.

    So now you can send a step with your jar to EMR:

    aws emr add-steps --cluster-id cluster_id \
    --steps Type=CUSTOM_JAR,Name="a step name",Jar=s3://app/my-s3distcp-1.0.jar,\
    Args=["key","value"]
    

    Put in the Args all required parameters, like source and destination s3 paths.

    Note: do not specify a bucket specific endpoint supported by hadoop-aws. It uses it in way not compatible with acceleration, you will get the same exception everytime.

    Links: