Search code examples
javahadoopelastic-map-reduce

Hadoop map-reduce mapper programming


import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;


public class ADDMapper extends MapReduceBase implements Mapper<LongWritable,
                              Text,Text,LongWritable>
{   @Override
public void map(LongWritable key, Text value,OutputCollector<Text,    LongWritable> output, Reporter r)throws IOException 
    {
    String s=value.toString();
         char[] words=s.toCharArray();
                    int wno=0;
                    int ino=0;
        for(int i=0;i<words.length;i++)
          {    

           String temp="";  
               for(int j=ino;j<words.length;j++)
                   {                        

                        if(words[j]!=' ')
                        {   temp+=words[j];
                        }
                        else
                        {
                            wno=j;
                        if(temp!="")
                        {     

                            ino=ino + key; //////POINT OF ERROR

        output.collect(new Text(temp),new LongWritable(ino));
                        }

                    temp="";

                        ino=wno+1;
                        break;
                        }

                  }
        } 
}

}

I want to get the index value of every string, sorted by string.
The above code is neither giving the index value nor shuffling the strings. let input file: hi how are you hi i am right. how is your job. hi are you ok.

output: am 50 are 7,33 hi 0,30,44 how 3,14 . .


Solution

  • Please run the below code, it is running fine and gives your expected output.

    provide input and output path in command line arguments.(args[0], args[1])

    import java.io.IOException;
    import java.util.*;
    import java.util.Map.Entry;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    
    
        public class IndexCount {
    
           public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
             public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    
               String str=value.toString();
               String[] tokens = str.split(" "); //split into words
               //create hashmap for unique word
               HashMap<String,Integer> uniqueString = new HashMap<String,Integer>();
               for(int i=0;i<tokens.length;i++){
                   uniqueString.put(tokens[i],1);
               }       
               //for sorting create TreeMap from above hash map
               TreeMap<String, Integer> map = new TreeMap<String,Integer>(uniqueString); 
                for (Entry<String, Integer> entry : map.entrySet()) {
                   int index=0;
               //find the index of the word
                   index = str.indexOf((String)entry.getKey());
                   while (index >= 0) {
                           output.collect(new Text((String)entry.getKey()),new IntWritable(index));
                           index = str.indexOf((String)entry.getKey(), index + 1);
                   }
               }
           }
        }
           public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
             public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    
               while (values.hasNext()) {
                   output.collect(key, new IntWritable(values.next().get()));
               }
    
             } 
        }
           public static void main(String[] args) throws Exception {
             JobConf conf = new JobConf(WordCount.class);
             conf.setJobName("indexfinder");
    
             conf.setOutputKeyClass(Text.class);
             conf.setOutputValueClass(IntWritable.class);
             conf.setMapperClass(Map.class);
             conf.setCombinerClass(Reduce.class);
             conf.setReducerClass(Reduce.class);    
             conf.setInputFormat(TextInputFormat.class);
             conf.setOutputFormat(TextOutputFormat.class);
    
             FileInputFormat.setInputPaths(conf, new Path(args[0]));
             FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    
             JobClient.runJob(conf);
           }
        }