Search code examples
hadoopapache-sparkapache-spark-sqlhadoop2hadoop-partitioning

fs.rename(new Path(rawFileName), new Path(processFileName)) is not working


I am working on Scala based Apache Spark implementation for data onboarding from remote location to HDFS and then on data ingestion from HDFS to Hive tables.

Using my first spark job I have onboarded the data/files into HDFS at a location say -

hdfs://sandbox.hortonworks.com:8020/data/analytics/raw/ folder

Let's consider that after onboarding CT_Click_Basic.csv and CT_Click_Basic1.csv.gz files I have following files in HDFS [file names at shared location would be a folder name here and its content would be present in part-xxxxx files]:

[root@sandbox ~]# hdfs dfs -ls /data/analytics/raw/*/ Found 3 items

-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS

-rw-r--r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00000

-rw-r--r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00001

Found 2 items

-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS

-rw-r--r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000

Now using my another Spark Job, I want to move these files as it is from /raw folder to /process and then finally to /archive folder in HDFS based on tasks performed at each stage.

For doing that I am first fetching list of all the files present under /raw folder using following code:

    def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
           if(!fileStatus.isDirectory()) {
                filePaths+=fileStatus.getPath()      
            }
            else {
                listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
            }
        }
  }   
  filePaths
}

and then using following line of codes, I am trying to rename/move the files in /raw folder to /process folder:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }

But I am not able to move/rename these files using above written code. I tried debugging the code and found that fs.rename() is returning me "false".

Please Note: I am able to achieve the file renaming/movement when I copy any file manually in /data/analytics/raw folder ex CT.csv [or any other file] and then running fs.rename() but it is not working for Part-xxxxx files.

Is there something which I am missing?

Any quick help will be appreciated.

Regards, Bhupesh


Solution

  • Finally, I have got the issue. Actually I was trying to rename file from /data/analytics/raw/folder.csv/part-xxxxx to /data/analytics/process/folder.csv/part-xxxxx where /data/analytics/process was present in HDFS but "folder.csv" was not present; hence it was returning me false while renaming. I have added following line in my code and worked fine for me

    var inputDir = "/data/analytics/raw"
    var outputDir = "/data/analytics/process"
    var filePaths = new ListBuffer[Path]()
    var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
    val fs= <Getting hdfs FileSystem Instance Here>
    for(path<-pathArray){
       var pathSplit = path.toString().split("/")
       var pathSplitSize = pathSplit.size
    
       val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
    
       var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
       var processFolderPath = new Path(processFolderName)
       if(!(fs.exists(processFolderPath)))
             fs.mkdirs(processFolderPath)
       val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
       fs.rename(new Path(rawFileName), new Path(processFileName))
     }