Search code examples
rhadoop

rmr2 is duplicating the keys from my mapper


For some reason rmr2 seems to be improperly processing keys in certain circumstances, duplicating the key for each value.

I am using R version 3.1.1, the 64-bit version, under Windows 7. My rmr version is rmr2_2.3.0.

I am using the local mode by setting rmr.options(backend="local").

I have a very simple text file with these contents:

a|1|blue
b|2|green
c|1|green
d|3|blue
e|2|yellow

I can easily retrieve these contents with the following map-reduce job where f is the path to my file:

library(rmr2)

from.dfs(
  mapreduce(
  input=f,
  input.format="text",
  map = function(k,v) keyval(k,v)))

The output, as expected, is:

$key
NULL

$val
[1] "a|1|blue"   "b|2|green"  "c|1|green"  "d|3|blue"   "e|2|yellow"

I can run another map reduce job that will assign the length of the lines as the key to be passed to the reducer:

from.dfs(
  mapreduce(
  input=f,
  input.format="text",
   map = function(k,v) keyval(str_length(v),v)))

As expected, the output is:

$key
[1]  8  8  9  9 10

$val
[1] "a|1|blue"   "d|3|blue"   "b|2|green"  "c|1|green"  "e|2|yellow"

Instead of the length of the value, I can take the first character instead:

from.dfs(
  mapreduce(
  input=f,
  input.format="text",
  map = function(k,v) keyval(substr(v,0,1),v)))

The output, again as expected, is:

$key
[1] "a" "b" "c" "d" "e"

$val
[1] "a|1|blue"   "b|2|green"  "c|1|green"  "d|3|blue"   "e|2|yellow"

So far, so good. Now I want to split the value and use the first field. My code is:

from.dfs(
  mapreduce(
  input=f,
  input.format="text",
  map = function(k,v) keyval(unlist(strsplit(v,'\\|'))[1],v)))

This time the output, quite unexpectedly, is:

$key
[1] "a" "a" "a" "a" "a"

$val
[1] "a|1|blue"   "b|2|green"  "c|1|green"  "d|3|blue"   "e|2|yellow"

I would expect to see the same output as the previous example using substring, but instead of the key vector being "a" "b" "c" "d" "e" it is just the first key repeated 5 times, "a" "a" "a" "a" "a".

I can change the field I am trying to get, for example to the third field:

from.dfs(
  mapreduce(
  input=f,
  input.format="text",
  map = function(k,v) keyval(unlist(strsplit(v,'\\|'))[3],v)))

and again the first key is repeated instead of a unique key for every value. The output is:

$key
[1] "blue" "blue" "blue" "blue" "blue"

$val
[1] "a|1|blue"   "b|2|green"  "c|1|green"  "d|3|blue"   "e|2|yellow"

I am at my wits end with this. What is happening? Is this a bug in rmr2, or what?

UPDATE: I tried the same examples on an Ubuntu 12.04 system running rmr2_3.1.1 with a fully distributed HDP2 cluster and got nearly the same results. For the example that return the expected results the only difference I got is that the order of the key,val pairs is not the same as in the file (which is understandable). For the example that had a problem I get even weirder results:

$key
[1] "d" "d" "a" "a" "a"

$val
[1] "d|3|blue"   "e|2|yellow" "a|1|blue"   "b|2|green"  "c|1|green"

Solution

  • Answering my own question. The answer to the question involves understanding (guessing) what the structure is of the (k,v) parameters input to the map function. It appears that they are vectors of values. That is, the map function is called a single time with a vector containing all the values of the split. This is possible because Hadoop streaming is being used. Contrast this with a typical Java Hadoop map function that gets called once for each value and receives only one value on each invocation.

    So the solution is to use:

    from.dfs( 
      mapreduce(
      input=f,
      input.format="text",
       map = function(k,v) keyval(matrix(unlist(strsplit(v,'\\|')),nrow=length(v),byrow=TRUE)[,1],v)))
    

    which produces the desired resut:

    $key
    [1] "d" "e" "a" "b" "c"
    
    $val
    [1] "d|3|blue"   "e|2|yellow" "a|1|blue"   "b|2|green"  "c|1|green"