Search code examples
hdfshadoop2s3distcp

s3distcp copy from S3 to EMR HDFS data replica always on one node


I am using s3distcp to copy a 500GB dataset into my EMR cluster. It's a 12 node r4.4xlarge cluster each with 750GB disk. It's using the EMR release label emr-5.13.0 and I'm adding Hadoop: Amazon 2.8.3, Ganglia: 3.7.2 and Spark 2.3.0. I'm using the following command to copy the data into the cluster:

s3-dist-cp --src=s3://bucket/prefix/ --dest=hdfs:///local/path/ --groupBy=.*(part_).* --targetSize=128 --outputCodec=none

When I look at the disk usage in either Ganglia or the namenode UI (port 50070 on the EMR cluster) then I can see that one node has most of it's disk filled and the others have a similar percentage used. Clicking through a lot of the files (~50) I can see that a replicate of the file always appears on the full node.

I'm using Spark to transform this data, write it to HDFS and then copy back to S3. I'm having trouble with this dataset as my tasks are being killed. I'm not certain this is the cause of the problem. I don't need to copy the data locally, nor decompress it. Initially I thought the BZIP2 codec was not splitable and decompressing would help gain parallelism in my Spark jobs but I was wrong, it is splitable. I have also discovered the hdfs balancer command which I'm using to redistribute the replicas and see if this solves my Spark problems.

However, now I've seen what I think is odd behaviour I would like to understand if this is normal for s3distcp/HDFS to create a replica of the files always on one node?


Solution

  • s3distcp is closed source; I can't comment in detail about its internals.

    When HDFS creates replicas of data, it tries to save one block to the local machine, then 2 more elsewhere (Assuming replication==3). Whichever host is running the distcp worker processes will end up having a copy of the entire file. So if only one host is used for the copy, that fills up.

    FWIW, I don't believe you need to do that distcp, not if you can do a read and filter of the data straight off S3, saving that result to hdfs. Your spark workers will do the filtering, and write their blocks back to the machines running these workers and other hosts in the chain. And for short-lived clusters, you could also try lowering the hdfs replication factor (2?), so save on HDFS data across the cluster, at the cost of having one less place for spark to schedule work adjacent to the data