Search code examples
hadoopmapreducehivehdfselastic-map-reduce

Hive / Map-Reduce Job on a Hadoop cluster: How to (roughly) calculate the diskspace needed?


following use case:

I run a hive query on data which has about 500GB size in .gz compression:

select count(distinct c1), c2 from t1 group by c2;

This query results in ~2800 map jobs and ~400 reduce jobs.

When setting up a Hadoop cluster with 20 instances each 160GB instance storage, the job will stop at 97% map and 21% reduce progress, then fall back to 94% map and 19% reduce progress and then no progress anymore at all. I think this is because the disk space of HDFS is at its usage limit. Maybe I can provide an exception message later that day.

How ever: Is there a way to roughly pre calculate the HDFS disk space needed depending on the input size of the data which is beeing procesed? Remember, the input data is stored in .gz format.

Update

Does anyone know, why my MapReduce job utilizes only the local storage of the nodes, and not the DFS?

DFS usage overview http://img27.imageshack.us/img27/5805/dfsusageoverview.png

DFS usage detail http://img542.imageshack.us/img542/5026/dfsusagedetail.png

Exception from one of the mappers:

at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
        at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:143)
        ... 8 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Spill failed
        at org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(ReduceSinkOperator.java:304)
        at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
        at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:959)
        at org.apache.hadoop.hive.ql.exec.GroupByOperator.flush(GroupByOperator.java:926)
        at org.apache.hadoop.hive.ql.exec.GroupByOperator.processHashAggr(GroupByOperator.java:779)
        at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:722)
        at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
        at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
        at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:83)
        at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
        at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:533)
        ... 9 more
Caused by: java.io.IOException: Spill failed
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1045)
        at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:599)
        at org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(ReduceSinkOperator.java:289)
        ... 24 more

Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/s
pill15.out
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
        at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
        at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
        at org.apache.hadoop.mapred.MapOutputFile.getSpillFileForWrite(MapOutputFile.java:121)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1408)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:869)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1360)

Solution

  • Here are a few notes taken from the Cloudera blog:

    Each file has a default replication factor of 3 and you need to leave approximately 25% of the disk space for intermediate shuffle files. So you need 4x times the raw size of the data you will store in the HDFS. However, the files are rarely stored uncompressed and, depending on the file content and the compression algorithm, on average we have seen a compression ratio of up to 10-20 for the text files stored in HDFS. So the actual raw disk space required is only about 30-50% of the original uncompressed size.

    If I may add something, if space is really a limitation, you should consider compressing the intermediate output (between mapper and reducer) to reduce intermediate shuffle files. You can do this the following way for example with Gzip compression:

    conf.set(“mapred.compress.map.output”, “true”)
    conf.set(“mapred.output.compression.type”, “BLOCK”);
    conf.set(“mapred.map.output.compression.codec”, “org.apache.hadoop.io.compress.GzipCodec”);