I know that dfs.block.replicator.classname property can be used to change the BlockPlacementPolicy. I want to know when exactly is this policy used to place data? Like is it used when -copyFromLocal/-put are executed? I think the output of a job will also be placed according to this policy.
And secondly, the property when specified in the conf file will affect the entire hadoop cluster. If I am using a shared cluster, is there a way to change the BlockPlacement policy for only jobs that are executed under my user, or is there a way to change the policy for each job?
I am using the hadoop streaming jar on a 4 node cluster.
The block placement policy is used, whenever a new block of data is written to HDFS. It could be when the data is ingested into HDFS or a job writes data into HDFS etc. It is used for optimal placement of blocks, so that there is a uniform distribution blocks in a HDFS cluster.
For e.g. the algorithm used by default block placement policy class (BlockPlacementPolicyDefault
) is:
The replica placement strategy is that if the writer is on a datanode,
the 1st replica is placed on the local machine, otherwise a random datanode.
The 2nd replica is placed on a datanode that is on a different rack. The 3rd
replica is placed on a datanode which is on a different node of the rack as
the second replica.
The block placement policy is also used by the following HDFS utilities:
BlockPlacementPolicy
could be used for placing the blocks to other nodes, in order to re-balance the clusterBlockPlacementPolicy
is used for checking the number of mis-replicated blocks.You can have your own custom block placement class. To do that you need to extend BlockPlacementPolicy
class and set the configuration parameter dfs.block.replicator.classname
to your custom class name in hdfs-site.xml.
By default BlockPlacementPolicyDefault
class is used for block placement:
final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class);
You can't change the block placement policy for each job. The reason for this is, the block placement policy is instantiated once, when the NameNode comes up.
Following is the sequence of calls, to initialize BlockPlacementPolicy
. These steps are executed, when the NameNode is started:
Initialize NameNode
, when NameNode
is started
NameNode::initialize(conf); // Initialize NameNode
NameNode::loadNamesystem(conf); // Load name system
Initialize FsNameSystem
. FsNameSystem
does all book keeping work on NameNode
FSNamesystem.loadFromDisk(conf); // Loads FS Image from disk
Instantiate BlockManager
. This is called while instantiating FsNameSystem
this.blockManager = new BlockManager(this, conf);
InstantiateBlockPlacementPolicy
. This is called by BlockManager
.
blockplacement = BlockPlacementPolicy.getInstance(
conf, datanodeManager.getFSClusterStats(),
datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
Since this is instantiated once, you can't change this for each job.