Search code examples
rmapreducehdfsrhadoop

How to input HDFS file into R mapreduce for processing and get the result into HDFS file


I have a question similar to the below link in stackoverflow

R+Hadoop: How to read CSV file from HDFS and execute mapreduce?

I am tring to read a file from location "/somnath/logreg_data/ds1.10.csv" in HDFS, reduce its number of columns from 10 to 5 and then write to another location "/somnath/logreg_data/reduced/ds1.10.reduced.csv" in HDFS using the below transfer.csvfile.hdfs.to.hdfs.reduced function.

transfer.csvfile.hdfs.to.hdfs.reduced("hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", "hdfs://10.5.5.82:8020/somnath/logreg_data/reduced/ds1.10.reduced.csv", 5)

The function definition is

transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        #r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = from.dfs(hdfsFilePath),
                                          input.format = "native",
                                          map = function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                print(label)
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))}
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        #to.dfs(reduced.values)
                }

But I am receiving an error

Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open the connection
Calls: transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader -> do.call -> <Anonymous> -> file
In addition: Warning message:
In file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open file 'hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv': No such file or directory
Execution halted

OR

When I am trying to load a file from hdfs using the below commands, I am getting the below error:

> x <- hdfs.file(path="hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",mode="r")
Error in hdfs.file(path = "hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",  :
  attempt to apply non-function

Any help will be highly appreciated

Thanks


Solution

  • Basically found a solution to the problem that I stated above.

    r.file <- hdfs.file(hdfsFilePath,"r")
    from.dfs(
        mapreduce(
             input = as.matrix(hdfs.read.text.file(r.file)),
             input.format = "csv",
             map = ...
    ))
    

    Below is the entire modified function:

    transfer.csvfile.hdfs.to.hdfs.reduced =
                    function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                            hdfs.init()
                            #local.df = data.frame()
                            #hdfs.get(hdfsFilePath, local.df)
                            #to.dfs(local.df)
                            r.file <- hdfs.file(hdfsFilePath,"r")
                            transfer.reduced.map =
                                            function(.,M) {
                                                    numRows <- length(M)
                                                    M.vec.elems <-unlist(lapply(M,
                                                                                    function(x) strsplit(x, ",")))
                                                    M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                    label <- M.matrix[,dim(M.matrix)[2]]
                                                    reduced.predictors <- M.matrix[,1:reducedCols]
                                                    reduced.M <- cbind(reduced.predictors, label)
                                                    keyval(
                                                         1,
                                                         as.numeric(reduced.M))
                                            }
                            reduced.values =
                                 values(
                                         from.dfs(
                                            mapreduce(
                                              input = as.matrix(hdfs.read.text.file(r.file)),
                                              input.format = "csv",
                                              map = function(.,M) {
                                                    numRows <- length(M)
                                                    M.vec.elems <-unlist(lapply(M,
                                                           function(x) strsplit(x, ",")))
                                                    M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                    label <- M.matrix[,dim(M.matrix)[2]]
                                                    reduced.predictors <- M.matrix[,1:reducedCols]
                                                    reduced.M <- cbind(reduced.predictors, label)
                                                    keyval(
                                                         1,
                                                         as.numeric(reduced.M)) }
                            )))
                            write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                            w.file <- hdfs.file(hdfsWritePath,"w")
                            hdfs.write(reduced.values,w.file)
                            hdfs.close(r.file)
                            hdfs.close(w.file)
                            #to.dfs(reduced.values)
                    }
    

    Hope this helps and don't forget to give points if you find it useful. Thanks ahead