Search code examples
javacsvhadoopmapreduce

Collecting specific data from CSV file using Hadoop MapReduce


I'm in need of some assistance with a MapReduce program. I have a CSV file with 15 total columns. I'm trying to extract data from two of the columns (Market and Amount Funded) based on the value (Year) of a third column.

As of now, my program outputs the data from the two columns (Market and Amount Funded) for each entry. What I would like for it to output is either the Total Amount Funded for each Market for a specified year or the Total Amount Funded for each Market for a specified range of years.

I'll post my mapper code below along with an example data entry. Any assistance will be greatly appreciated!

public class FundingMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text market = new Text();
private Text amount = new Text();

public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException {
    String line = value.toString();
    CSVReader R = new CSVReader(new StringReader(line));

    String[] ParsedLine = R.readNext();
    R.close();

    amount.set(ParsedLine[15]);
    market.set(ParsedLine[3]);

    output.collect(market, amount);
}
}

/organization/hashoff, #HASHOFF, |Digital Media|Internet|Social Media|, Digital Media, USA, CO, Denver, Denver, /funding-round/669d6203c0374e6cf0e8d10f75ba0b8a, debt_financing, 12/8/14, 2014-12, 2014-Q4, 2014, 455,000

For the above entry, my program will output, with the proper headers, Digital Media and 455,000 for Market and Amount Funded, respectively. I'd like the program to output results based on the year or specified range of years.

Here's my job code as well:

public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(FundingJob.class);

    conf.setJobName("Funding Data");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(Text.class);

    conf.setMapperClass(FundingMapper.class);
    conf.setNumReduceTasks(0);

    FileInputFormat.addInputPath(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    JobClient.runJob(conf);
}
}

Solution

  • You are outputting as key the market and as value the amount, I think you shoukd move to a year-market concatenation regarding the key. By having such a key, you will be able to generate a list of (key,value) pairs like this example one:

    2014-DigitalMedia,455000
    2014-OtherMarket,34500
    2014-DigitalMedia,100000
    2015-DigitalMedia,120000
    2015-DigitalMedia,67000
    2015-OtherMarket,15000
    2015-OtherMarket,10000
    ...
    

    Then a reducer class could get each tuple and aggregate the amount per key, resulting:

    2014-DigitalMedia,555000
    2014-OtherMarket,34500
    2015-DigitalMedia,187000
    2015-OtherMarket,25000
    

    The code for the reducer could be something like:

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
    
        while (values.hasNext()) {
            sum += values.next().get();
        }
    
        output.collect(key, new IntWritable(sum));
    }
    

    In the main program you must add:

    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);