This is my first implementation in Hadoop. I am trying to implement my algorithm for probabilistic dataset in Map Reduce. In my dataset, last column will have some id(number of unique id's in the dataset is equal to the number of nodes in my cluster). I have to divide my dataset based on this column value and each set of records should be processed by each nodes in my cluster.
For example, if i have three nodes in my cluster, for the below dataset, one node should process all the records with id=1, another one with id=2, another one with id=3
name time dept id
--------------------
b1 2:00pm z1 1
b2 3:00pm z2 2
c1 4:00pm y2 1
b3 3:00pm z3 3
c4 4:00pm x2 2
My map function should take each split as an input and process it in parallel in each node.
I am just trying to understand, which approach is possible to do in Hadoop. Either to input this dataset as a input for my map function and pass an additional argument with map to split the data based on id value. Or split the data beforehand to "n"(number of nodes) subsets and load it in to the nodes, if this is the correct approach, how it is possible to split the data based on value and load in different nodes. Because, what i understood from my readings is that hadoop split the data in to blocks based on the specified size. How can we specify a particular condition while loading. Just to add up, I am writing my program in python.
Someone please advise. Thanks
The simplest thing for you would probably be to have the mapper output the data with the id as key, which will guarantee that one reducer will get all the records for a specific id and then do your processing in the reducer phase.
For example,
Input data:
b1 2:00pm z1 1
b2 3:00pm z2 2
c1 4:00pm y2 1
b3 3:00pm z3 3
c4 4:00pm x2 2
Mapper code:
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[-1]
print key + "\t" + line
Map output:
1 b1 2:00pm z1 1
2 b2 3:00pm z2 2
1 c1 4:00pm y2 1
3 b3 3:00pm z3 3
2 c4 4:00pm x2 2
Reducer 1 input:
1 b1 2:00pm z1 1
1 c1 4:00pm y2 1
Reducer 2 input:
2 b2 3:00pm z2 2
Reducer 3 input:
3 b3 3:00pm z3 3
Reducer code:
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
orig_line = "\t".join(cols[1:])
# do stuff...
Note that this way a single reducer might get several keys, but the data will be ordered and you can control the number of reducers with the mapred.reduce.tasks option.
EDIT If you want to collect your data in the reducer per key you can do something like this (not sure it will run as-is but you get the idea)
#!/usr/bin/env python
import sys
def process_data(key_id, data_list):
# data_list has all the lines for key_id
last_key = None
data = []
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[0]
if last_key and key != last_key:
process_data(last_key, data)
data = []
orig_line = "\t".join(cols[1:])
data.append(orig_line)
last_key = key
process_data(last_key, data)
If you aren't worried about running out of memory in the reducer step you can simplify the code like this:
#!/usr/bin/env python
import sys
from collections import defaultdict
def process_data(key_id, data_list):
# data_list has all the lines for key_id
all_data = defaultdict(list)
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[0]
orig_line = "\t".join(cols[1:])
all_data[key].append(orig_line)
for key, data in all_data.iteritems():
process_data(key, data)