Search code examples
hadoophivehdfshadoop-yarncloudera

How do I fix "File could only be replicated to 0 nodes instead of minReplication (=1)."?


I asked a similar question a while ago, and thought I solved this problem, but it turned out that it went away simply because I was working on a smaller dataset.

Numerous people have asked this question and I have gone through every single internet post that I could find and still didn't make any progress.

What I'm trying to do is this: I have an external table browserdata in hive that refers to about 1 gigabyte of data. I try to stick that data into a partitioned table partbrowserdata, whose definition goes like this:

CREATE EXTERNAL TABLE IF NOT EXISTS partbrowserdata (                                                                                                                                                              
    BidID string,                                                                                                                                                                                                  
    Timestamp_ string,                                                                                                                                                                                             
    iPinYouID string,                                                                                                                                                                                              
    UserAgent string,                                                                                                                                                                                              
    IP string,                                                                                                                                                                                                     
    RegionID int,                                                                                                                                                                                                  
    AdExchange int,                                                                                                                                                                                                
    Domain string,                                                                                                                                                                                                 
    URL string,                                                                                                                                                                                                    
    AnonymousURL string,                                                                                                                                                                                           
    AdSlotID string,                                                                                                                                                                                               
    AdSlotWidth int,                                                                                                                                                                                               
    AdSlotHeight int,                                                                                                                                                                                              
    AdSlotVisibility string,                                                                                                                                                                                       
    AdSlotFormat string,                                                                                                                                                                                           
    AdSlotFloorPrice decimal,                                                                                                                                                                                      
    CreativeID string,                                                                                                                                                                                             
    BiddingPrice decimal,                                                                                                                                                                                          
    AdvertiserID string,                                                                                                                                                                                           
    UserProfileIDs array<string>                                                                                                                                                                                   
)                                                                                                                                                                                                                  
PARTITIONED BY (CityID int)                                                                                                                                                                                        
ROW FORMAT DELIMITED                                                                                                                                                                                               
FIELDS TERMINATED BY '\t'                                                                                                                                                                                          
STORED AS TEXTFILE                                                                                                                                                                                                 
LOCATION '/user/maria_dev/data2';

with this query:

insert into table partbrowserdata partition(cityid) 
select BidID,Timestamp_ ,iPinYouID ,UserAgent ,IP ,RegionID ,AdExchange ,Domain ,URL ,AnonymousURL ,AdSlotID ,AdSlotWidth ,AdSlotHeight ,AdSlotVisibility ,AdSlotFormat ,AdSlotFloorPrice ,CreativeID ,BiddingPrice ,AdvertiserID ,UserProfileIDs ,CityID 
from browserdata;

And every time, on every platform, be it hortonworks or cloudera, I get this message:

Caused by: 

org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/maria_dev/data2/.hive-staging_hive_2019-02-06_18-58-39_333_7627883726303986643-1/_task_tmp.-ext-10000/cityid=219/_tmp.000000_3 could only be replicated to 0 nodes instead of minReplication (=1).  There are 4 datanode(s) running and no node(s) are excluded in this operation.
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1720)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3389)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:683)
        at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:214)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:495)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

        at org.apache.hadoop.ipc.Client.call(Client.java:1504)
        at org.apache.hadoop.ipc.Client.call(Client.java:1441)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:413)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1814)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1610)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:773)

What do I do? I can't understand why this is happening. It does seem like a memory issue though, because I am able to insert a few rows, but not all of them for some reason. Note that I have plenty of memory on HDFS, so 1 gig of extra data is pennies on a dollar, so it's probably a RAM issue?

Here's my dfs report output:

enter image description here

I have tried this on all execution engines: spark, tez, mr.

Please do not suggest solutions that say that I need to format the namenode, because they do not work, and they are not solutions in any way.

update:

After looking at logs for namenode I noticed this, if it helps:

Failed to place enough replicas, still in need of 1 to reach 1 (unavailableStorages=[DISK ], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=true) All required storage types are unavailable: unavailableStorages=[DISK], stor agePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}

These logs suggest this:

For more information, please enable DEBUG log level on org.apache.hadoop.hdfs.ser ver.blockmanagement.BlockPlacementPolicy and org.apache.hadoop.net.NetworkTopology

How do I do that?

I also noticed a similar unresolved post on here:

HDP 2.2@Linux/CentOS@OracleVM (Hortonworks) fails on remote submission from Eclipse@Windows

update 2:

I just tried partitioning this with spark, and it works! So, this must be a hive bug...

update 3:

Just tested this on MapR and it worked, but MapR doesn't use HDFS. This is is definitely some sort of HDFS + Hive combination bug.

Proof:

enter image description here


Solution

  • I ended up reaching out to cloudera forums and they answered my question in a matter of minutes: http://community.cloudera.com/t5/Storage-Random-Access-HDFS/Why-can-t-I-partition-a-1-gigabyte-dataset-into-300/m-p/86554#M3981 I tried what Harsh J suggests and it worked perfectly!

    Here's what he said:

    If you are dealing with unordered partitioning from a data source, you can end up creating a lot of files in parallel as the partitioning is attempted.

    In HDFS, when a file (or more specifically, its block) is open, the DataNode performs a logical reservation of its target block size. So if your configured block size is 128 MiB, then every concurrently open block will deduct that value (logically) from the available remaining space the DataNode publishes to the NameNode.

    This reservation is done to help manage space and guarantees of a full block write to a client, so that a client that's begun writing its file never runs into an out of space exception mid-way.

    Note: When the file is closed, only the actual length is persisted, and the reservation calculation is adjusted to reflect the reality of used and available space. However, while the file block remains open, its always considered to be holding a full block size.

    The NameNode further will only select a DataNode for a write if it can guarantee full target block size. It will ignore any DataNodes it deems (based on its reported values and metrics) unfit for the requested write's parameters. Your error shows that the NameNode has stopped considering your only live DataNode when trying to allocate a new block request.

    As an example, 70 GiB of available space will prove insufficient if there will be more than 560 concurrent, open files (70 GiB divided into 128 MiB block sizes). So the DataNode will 'appear full' at the point of ~560 open files, and will no longer serve as a valid target for further file requests.

    It appears per your description of the insert that this is likely, as each of the 300 chunks of the dataset may still carry varied IDs, resulting in a lot of open files requested per parallel task, for insert into several different partitions.

    You could 'hack' your way around this by reducing the request block size within the query (set dfs.blocksize to 8 MiB for ex.), influencing the reservation calculation. However, this may not be a good idea for larger datasets as you scale, since it will drive up the file:block count and increase memory costs for the NameNode.

    A better way to approach this would be to perform a pre-partitioned insert (sort first by partition and then insert in a partitioned manner). Hive for example provides this as an option: hive.optimize.sort.dynamic.partition, and if you use plain Spark or MapReduce then their default strategy of partitioning does exactly this.

    So, at the end of the day I did set hive.optimize.sort.dynamic.partition=true; and everything started working. But I also did another thing.

    Here's one of my posts from earlier as I was investigating this issue: Why do I get "File could only be replicated to 0 nodes" when writing to a partitioned table? I was running into a problem where hive couldn't partition my dataset, because hive.exec.max.dynamic.partitions was set to 100, so, I googled this issue and somewhere on hortonworks forums I saw an answer, saying that I should just do this:

    SET hive.exec.max.dynamic.partitions=100000; 
    SET hive.exec.max.dynamic.partitions.pernode=100000;
    

    This was another problem, maybe hive tries to open as many of those concurrent connections as you set hive.exec.max.dynamic.partitions, so my insert query didn't start working until I decreased these values to 500.