I am working on Scala based Apache Spark implementation for data onboarding from remote location to HDFS and then on data ingestion from HDFS to Hive tables.
Using my first spark job I have onboarded the data/files into HDFS at a location say -
hdfs://sandbox.hortonworks.com:8020/data/analytics/raw/ folder
Let's consider that after onboarding CT_Click_Basic.csv and CT_Click_Basic1.csv.gz files I have following files in HDFS [file names at shared location would be a folder name here and its content would be present in part-xxxxx files]:
[root@sandbox ~]# hdfs dfs -ls /data/analytics/raw/*/ Found 3 items
-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS
-rw-r--r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00000
-rw-r--r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00001
Found 2 items
-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS
-rw-r--r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000
Now using my another Spark Job, I want to move these files as it is from /raw folder to /process and then finally to /archive folder in HDFS based on tasks performed at each stage.
For doing that I am first fetching list of all the files present under /raw folder using following code:
def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
if(!fileStatus.isDirectory()) {
filePaths+=fileStatus.getPath()
}
else {
listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
}
}
}
filePaths
}
and then using following line of codes, I am trying to rename/move the files in /raw folder to /process folder:
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
But I am not able to move/rename these files using above written code. I tried debugging the code and found that fs.rename() is returning me "false".
Please Note: I am able to achieve the file renaming/movement when I copy any file manually in /data/analytics/raw folder ex CT.csv [or any other file] and then running fs.rename() but it is not working for Part-xxxxx files.
Is there something which I am missing?
Any quick help will be appreciated.
Regards, Bhupesh
Finally, I have got the issue. Actually I was trying to rename file from /data/analytics/raw/folder.csv/part-xxxxx to /data/analytics/process/folder.csv/part-xxxxx where /data/analytics/process was present in HDFS but "folder.csv" was not present; hence it was returning me false while renaming. I have added following line in my code and worked fine for me
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
var processFolderPath = new Path(processFolderName)
if(!(fs.exists(processFolderPath)))
fs.mkdirs(processFolderPath)
val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}