Search code examples
javahadoopmapreduce

MapReduce filtering to get customers not in order list?


Currently learning on MapReduce and trying to figure out how to code this into Java.

Two input files, called customers.txt and car_orders.txt:

customers.txt
===================
12345 Peter
12346 Johnson
12347 Emily
12348 Brad

[custNum, custName]

car_orders.txt
===================
00034 12345 23413
00035 12345 94832
00036 12346 8532
00037 12348 9483

[orderNo, custNum, carValue]

The idea is to apply MapReduce and output the customer that did not make a car order - in above scenario it is Emily.

Output:
===================
12347 Emily

This is what I have in mind:

Map phase:
1. Read the data inside customers.txt, get key-value pair, (custNum, custName)
2. Read the data inside car_orders.txt, get key-value pair, (custNum, [orderNo, carValue])
3. Partition into groups based on the key

Reduce phase:
1. Compare key-value A and key-value B, if key-value B is NULL
2. Output key-value A

Any help in the form of pseudocode for this application will be greatly appreciated.


Solution

  • It's basically a reduce-side-join where you discard the outputs that have both sides filled - same as you put it in your pseudocode.

    The code for that in Hadoop MapReduce would look like that:

    class TextMap extends Mapper<LongWritable, Text, Text, Text> {
    
       @Override
       public void map(LongWritable key, Text value, Context context) {
           String[] a = value.toString().split(" "); // assuming space separation
           if (a.length == 2) {
              context.write(new Text(a[0]), new Text(a[1]));
           } else if (a.length == 3) {
              context.write(new Text(a[1]), new Text(a[2]));
           }
       }
    }
    

    That would emit:

    12345 Peter
    12346 Johnson
    12347 Emily
    12348 Brad
    12345 23413
    12345 94832
    12346 8532
    12348 9483
    

    So the reducer would look fairly simple:

    class TextReduce extends Reducer<Text, Text, Text, Text> {
    
       @Override
       public void reduce(Text key, Iterable<Text> values, Context context) {
          List<String> vals = new ArrayList<>();
          for(Text t : values) {
             vals.add(t.toString());
          }
    
          if(vals.size() == 1) {
             context.write(new Text(vals.get(0)), new Text(""));
          }
       }
    }
    

    And that should just emit Emily.