Search code examples
rhadoopmapreducehadoop-streaming

Very slow/stuck hadoop streaming with R


I am trying to write custom map-reduce using R. Here is my mapper function:

#! /usr/bin/env Rscript

input <- file("stdin", "r")

while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
  # in case of empty lines
  if(nchar(line) == 0) break

  # split line into data
  data = unlist(strsplit(line, ","))

  # output scores with cat()
  cat(data[2],"|",data[3],"|",data[4]
      ,"\t"  # reduce key followed by tab
      ,paste(data[1],paste(unlist(data[5:length(data)]),collapse=","),sep = ",")    # all other fields separated by commas
      ,"\n",sep='') # line break
}

close(input)

So essentially combination of 3 columns is my key in here; rest of columns will go value. Once I get all data belonging to a perticular key in single reducer node then that will be processed by reducer code below:

first_line <- TRUE
first_time <- TRUE
prev_id <- ""
input <- file("stdin", "r")
while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
  if(nchar(line) == 0) break

  if(first_time == TRUE){
    first_time = FALSE
    next
  } 

  id <- unlist(strsplit(line,"\t"))[1]
  data0 <- unlist(strsplit(line,"\t"))[2]

  data1 = data.frame(t(unlist(strsplit(data0, ","))),stringsAsFactors=FALSE)

  colnames(data1) = c('ITEM_I','BOH','EOH','LATTD_I','LNGTD_I')

  data1$DEPT = strsplit(id,"\\|")[[1]][1]
  data1$CLAS = strsplit(id,"\\|")[[1]][2]
  data1$SBCL = strsplit(id,"\\|")[[1]][3]

  if(prev_id==id | first_line==T){
    if(!exists("base_data")){
      base_data <- rbind(data1)
      first_line <- F
    }else{
      base_data <- rbind(base_data,data1)
    }
  }else{
    if(!exists("results")){
      results <- BuildDTnProcess(base_data)
      base_data <- rbind(data1)
    }else{
      results <- rbind(results,BuildDTnProcess(base_data))
      base_data <- data1
    }
  }
  prev_id <- id
}
close(input)

if(!exists("results")){
  results <- BuildDTnProcess(base_data)
}else{
  results <- rbind(results,BuildDTnProcess(base_data))
}
base_data <- NULL

So I am trying to pile up all records belonging to single key into a data frame (along with initiating a new data frame whenever a new key appears). This data then passed to a function BuildDTnProcess which will do some operation to be done dataframe composed of single key observations; results of which will be stored in results.

I observed that this code gets stuck for couple of days and then gets killed. So I have started adding code blocks one by one to identify bottleneck. I have identified that until data1$MDSE_SBCL_REF_I = strsplit(id,"\\|")[[1]][3] code runs fine but when I add

if(prev_id==id | first_line==T){
        if(!exists("base_data")){
          base_data <- rbind(data1)
          first_line <- F
        }else{
          base_data <- rbind(base_data,data1)
        }
      }

then it becomes very slow. In logs (complete run in 20 mins) from

2016-05-11 14:57:26,160 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=200000/0/0 in:1169=200000/171 [rec/s] out:0=0/171 [rec/s] 2016-05-11 14:58:47,346 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=300000/0/0 in:1185=300000/253 [rec/s] out:0=0/253 [rec/s] 2016-05-11 15:00:09,503 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=400000/0/0 in:1194=400000/335 [rec/s] out:0=0/335 [rec/s] 2016-05-11 15:01:33,969 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=500000/0/0 in:1193=500000/419 [rec/s] out:0=0/419 [rec/s] 2016-05-11 15:02:54,523 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=600000/0/0 in:1200=600000/500 [rec/s] out:0=0/500 [rec/s]

it goes slow and stuck (not completing even after days) at

2016-05-11 13:51:17,543 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10000/0/0 in:87=10000/114 [rec/s] out:0=0/114 [rec/s] 2016-05-11 16:58:16,552 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=100000/0/0 in:8=100000/11333 [rec/s] out:0=0/11333 [rec/s]

Is there something important I am missing here?

PS: while doing this analysis I have removed all code portion below mentioned bottleneck code block.


Solution

  • Writing answer myself as I have found the reason and this question still dont have any reply or even comment. The root cause of slowness of performance was "rbind" operation. Rbind implementation is such a way that with it takes more time to append rows; to a bigger base data.frame than a smaller base data.frame. More details about this is present in here Growing a data.frame in a memory-efficient manner

    I myself have implemented the data.table along with pre-population version of solution and it works awesome.