Search code examples
performancehadoopmodeling

Hadoop performance modeling


I am working on Hadoop performance modeling. Hadoop has 200+ parameters so setting them manually is not possible. So often we run our hadoop jobs with default parameter value(like using default value io.sort.mb, io.sort.record.percent, mapred.output.compress etc). But using default parameter value gives us sub optimal performance. There is some work done in this area by Herodotos Herodotou (http://www.cs.duke.edu/starfish/files/vldb11-job-optimization.pdf) to improve performance. But i have following doubt in their work --

  1. They are fixing the value of parameters at the job start time( according to proportionality assumption of data) for all the phases( read, map, collect etc.) of MapReduce job. Can we set different value of these parameters for each phase at run time according to run time environment( like cluster configuration, underling file system etc.), by changing Hadoop configuration log files of a particular node to get optimal performance from a node ?
  2. They are using white box model for Hadoop core are they still applicable for current Hadoop ( http://arxiv.org/pdf/1106.0940.pdf) ?

Solution

  • No, you couldn't dynamically change MapReduce parameters per job per node.

    Configuring set of nodes

    Rather what you could do is change the configuration parameters per node statically in the configuration files (generally located in /etc/hadoop/conf), so that you could take the most out of your cluster with different h/w configurations.

    Example: Assume you have 20 worker nodes with different hardware configurations like:

    • 10 with configuration of 128GB RAM, 24 Cores
    • 10 with configuration of 64GB RAM, 12 Cores

    In that case you would want to configure each of identical servers to take most out of the hardware for example, you would want to run more child tasks (mappers & reducers) on worker nodes with more RAM and Cores, for example:

    • Nodes with 128GB RAM, 24 Cores => 36 worker tasks (mappers + reducers), JVM heap for each worker task would be around 3GB.
    • Nodes with 64GB RAM, 12 Cores => 18 worker tasks (mappers + reducers), JVM heap for each worker task would be around 3GB.

    So, you would want to configure the set of nodes respectively with appropriate parameters.

    Using ToolRunner to pass configuration parameters dynamically to a Job:

    Also, you could dynamically change the MapReduce job parameters per job but these parameters would be applied to the entire cluster not just to a set of nodes. Provided your MapReduce job driver extends ToolRunner.

    ToolRunner allows you to parse generic hadoop command line arguments. You'll be able to pass MapReduce configuration parameters using -D property.name=property.value.

    You can pretty much pass almost all hadoop parameters dynamically to a job. But most commonly passed MapReduce configuration parameters dynamically to a job are:

    • mapreduce.task.io.sort.mb
    • mapreduce.map.speculative
    • mapreduce.job.reduces
    • mapreduce.task.io.sort.factor
    • mapreduce.map.output.compress
    • mapreduce.map.outout.compress.codec
    • mapreduce.reduce.memory.mb
    • mapreduce.map.memory.mb

    Here is an example terasort job passing lots of parameters dynamically per job:

    
    hadoop jar hadoop-mapreduce-examples.jar tearsort \
    
      -Ddfs.replication=1 -Dmapreduce.task.io.sort.mb=500 \
    
      -Dmapreduce.map.sort.splill.percent=0.9 \
    
      -Dmapreduce.reduce.shuffle.parallelcopies=10 \
    
      -Dmapreduce.reduce.shuffle.memory.limit.percent=0.1 \
    
      -Dmapreduce.reduce.shuffle.input.buffer.percent=0.95 \
    
      -Dmapreduce.reduce.input.buffer.percent=0.95 \
    
      -Dmapreduce.reduce.shuffle.merge.percent=0.95 \
    
      -Dmapreduce.reduce.merge.inmem.threshold=0 \
    
      -Dmapreduce.job.speculative.speculativecap=0.05 \
    
      -Dmapreduce.map.speculative=false \
    
      -Dmapreduce.map.reduce.speculative=false \
    
  -Dmapreduce.job.jvm.numtasks=-1 \
      -Dmapreduce.job.reduces=84 \
    
  -Dmapreduce.task.io.sort.factor=100 \
    
      -Dmapreduce.map.output.compress=true \
    
  -Dmapreduce.map.outout.compress.codec=\
        
        org.apache.hadoop.io.compress.SnappyCodec \
    
      -Dmapreduce.job.reduce.slowstart.completedmaps=0.4 \
    
      -Dmapreduce.reduce.merge.memtomem.enabled=fasle \
    
      -Dmapreduce.reduce.memory.totalbytes=12348030976 \
    
      -Dmapreduce.reduce.memory.mb=12288 \
    
  -Dmapreduce.reduce.java.opts=“-Xms11776m -Xmx11776m \
      
        -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode \
      
        -XX:+CMSIncrementalPacing -XX:ParallelGCThreads=4” \
    
  -Dmapreduce.map.memory.mb=4096 \
    
  -Dmapreduce.map.java.opts=“-Xmx1356m” \
    
      /terasort-input /terasort-output