Search code examples
hadoop-yarnapache-flink

Why flink container vcore size is always 1


I am running flink on yarn(more precisely in AWS EMR yarn cluster).

I read flink document and source code that by default for each task manager container, flink will request the number of slot per task manager as the number of vcores when request resource from yarn. And I also confirmed from the source code:

// Resource requirements for worker containers
            int taskManagerSlots = taskManagerParameters.numSlots();
            int vcores = config.getInteger(ConfigConstants.YARN_VCORES, 
Math.max(taskManagerSlots, 1));
            Resource capability = Resource.newInstance(containerMemorySizeMB, 
vcores);

            resourceManagerClient.addContainerRequest(
                new AMRMClient.ContainerRequest(capability, null, null, 
priority));

When I use -yn 1 -ys 3 to start flink, I assume yarn will allocate 3 vcores for the only task manager container, but when I checked the number of vcores for each container from yarn resource manager web ui, I always see the number of vcores is 1. I also see vcore to be 1 from yarn resource manager logs.

I debugged the flink source code to the line I pasted below, and I saw value of vcores is 3. This is really confuse me, can anyone help to clarify for me, thanks.


Solution

  • An answer from Kien Truong

    Hi,

    You have to enable CPU scheduling in YARN, otherwise, it always shows that only 1 CPU is allocated for each container, regardless of how many Flink try to allocate. So you should add (edit) the following property in capacity-scheduler.xml:

    <property>
     <name>yarn.scheduler.capacity.resource-calculator</name>
     <!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> -->
     <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
    </property>
    

    ALso, taskManager memory is, for example, 1400MB, but Flink reserves some amount for off-heap memory, so the actual heap size is smaller.

    This is controlled by 2 settings:

    containerized.heap-cutoff-min: default 600MB
    
    containerized.heap-cutoff-ratio: default 15% of TM's memory
    

    That's why your TM's heap size is limitted to ~800MB (1400 - 600)