Search code examples
javagoogle-cloud-storagegoogle-cloud-platformgoogle-hadoop

Map tasks with input from Cloud Storage use only one worker


I am trying to use a file from Google Cloud Storage via FileInputFormat as input for a MapReduce job. The file is in Avro format.

As a simple test, I deployed a small Hadoop2 cluster with the bdutil tool, consisting of the master and two worker nodes with two slots each.

When running the job, the file is splitted into multiple parts. A fact which can be verified by looking into the logs where an offset is used to load data. As a result, multiple map tasks are created. So far nothing unusual.

But those map tasks do not get distributed among the worker nodes. Instead, two are started on just one node and the other ones are left in the Scheduled state.

I expected two map tasks on each worker to run, since the data is not locally available in any worker node (it's in the Cloud Storage), which makes them all equal candidates.

Why does this happen?


Solution

  • It appears you're seeing one of the artifacts of how YARN works; unlike Hadoop 1 where the JobTracker was effectively playing the role of Hadoop 2's AppMaster and ResourceManager at the same time, in Hadoop 2 the ResourceManager (running on your master node) actually packs a brand-new AppMaster onto a YARN container on-demand for each MapReduce job.

    Also, another concept that changes a bit is that you never quite have "slots", YARN containers actually schedule across both the memory and CPU dimensions. This means if a single task packed onto YARN requests lots of memory but only 1 CPU, it may occupy a resource footprint that otherwise might have packed several map or reduce tasks.

    For example, assuming you deployed 2 workers each n1-standard-2, you may see something like this on the ResourceManager page under http://<master-ip>:8088/cluster/nodes when running your mapreduce:

    ... Containers  Mem Used    Mem Avail   VCores Used VCores Avail    Version
    ...     1       5.50 GB     0 B         1           1               2.6.0
    ...     2       5 GB        512 MB      2           0               2.6.0
    

    In this case, visiting the application_master link from the ResourceManager showed that the ResourceManager was indeed packed onto the VM that reported 5.5GB Mem Used, 0B Mem Avail, 1 VCores Used. Similarly, I found that my map tasks were running only on the worker that reported 2 VCores Used.

    In general, this means if you're mostly interested in making sure it'll scale as you increase the number of workers, you don't have to do anything special; you'll just end up with your map or reduce tasks packed onto NUM_WORKERS - 1 possible machines while one of them runs the AppMaster for the job.

    Depending on your job though, this might be wasteful. The default settings are best for extremely large jobs where it makes sense to have a very large AppMaster to make sure it doesn't OOM tracking a large number of in-flight tasks. You can adjust the fine-grained settings by overriding NODEMANAGER_MEMORY_FRACTION, CORES_PER_MAP_TASK, CORES_PER_REDUCE_TASK, and CORES_PER_APP_MASTER in your custom *_env.sh file (or inline in hadoop2_env.sh, but this is harder to keep track of for upgrades vs maintaining a file like my_overrides_env.sh). The comments in bdutil's hadoop2_env.sh explain these settings:

    # Fraction of worker memory to be used for YARN containers
    NODEMANAGER_MEMORY_FRACTION=0.8
    
    # Decimal number controlling the size of map containers in memory and virtual
    # cores. Since by default Hadoop only supports memory based container
    # allocation, each map task will be given a container with roughly
    # (CORES_PER_MAP_TASK / <total-cores-on-node>) share of the memory available to
    # the NodeManager for containers. Thus an n1-standard-4 with CORES_PER_MAP_TASK
    # set to 2 would be able to host 4 / 2 = 2 map containers (and no other
    # containers). For more details see the script 'libexec/configure-mrv2-mem.py'.
    CORES_PER_MAP_TASK=1.0
    
    # Decimal number controlling the size of reduce containers in memory and virtual
    # cores. See CORES_PER_MAP_TASK for more details.
    CORES_PER_REDUCE_TASK=2.0
    
    # Decimal number controlling the size of application master containers in memory
    # and virtual cores. See CORES_PER_MAP_TASK for more details.
    CORES_PER_APP_MASTER=2.0
    

    Especially if you move up to larger machines like n1-standard-4, you may consider simply modifying NODEMANAGER_MEMORY_FRACTION to a smaller value.